1use datafusion::{
42 catalog::{CatalogProviderList, TableFunctionImpl},
43 config::{CatalogOptions, ConfigOptions},
44 datasource::{provider_as_source, TableProvider, ViewTable},
45 prelude::{flatten, get_field, make_array},
46};
47use datafusion_common::{
48 alias::AliasGenerator,
49 internal_err,
50 tree_node::{Transformed, TreeNode},
51 DFSchema, DataFusionError, Result, ScalarValue,
52};
53use datafusion_expr::{
54 col, lit, utils::split_conjunction, Expr, LogicalPlan, LogicalPlanBuilder, TableScan,
55};
56use datafusion_functions::string::expr_fn::{concat, concat_ws};
57use datafusion_sql::TableReference;
58use itertools::{Either, Itertools};
59use std::{collections::HashSet, sync::Arc};
60
61use crate::materialized::META_COLUMN;
62
63use super::{cast_to_materialized, row_metadata::RowMetadataRegistry, util, Materialized};
64
65pub fn mv_dependencies(
89 catalog_list: Arc<dyn CatalogProviderList>,
90 row_metadata_registry: Arc<RowMetadataRegistry>,
91 options: &ConfigOptions,
92) -> Arc<dyn TableFunctionImpl + 'static> {
93 Arc::new(FileDependenciesUdtf::new(
94 catalog_list,
95 row_metadata_registry,
96 options,
97 ))
98}
99
100#[derive(Debug)]
101struct FileDependenciesUdtf {
102 catalog_list: Arc<dyn CatalogProviderList>,
103 row_metadata_registry: Arc<RowMetadataRegistry>,
104 config_options: ConfigOptions,
105}
106
107impl FileDependenciesUdtf {
108 fn new(
109 catalog_list: Arc<dyn CatalogProviderList>,
110 row_metadata_registry: Arc<RowMetadataRegistry>,
111 config_options: &ConfigOptions,
112 ) -> Self {
113 Self {
114 catalog_list,
115 config_options: config_options.clone(),
116 row_metadata_registry,
117 }
118 }
119}
120
121impl TableFunctionImpl for FileDependenciesUdtf {
122 fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
123 let table_name = get_table_name(args)?;
124
125 let table_ref = TableReference::from(table_name).resolve(
126 &self.config_options.catalog.default_catalog,
127 &self.config_options.catalog.default_schema,
128 );
129
130 let table = util::get_table(self.catalog_list.as_ref(), &table_ref)
131 .map_err(|e| DataFusionError::Plan(e.to_string()))?;
132
133 let mv = cast_to_materialized(table.as_ref())?.ok_or(DataFusionError::Plan(format!(
134 "mv_dependencies: table '{table_name} is not a materialized view. (Materialized TableProviders must be registered using register_materialized"),
135 ))?;
136
137 Ok(Arc::new(ViewTable::new(
138 mv_dependencies_plan(
139 mv,
140 self.row_metadata_registry.as_ref(),
141 &self.config_options,
142 )?,
143 None,
144 )))
145 }
146}
147
148pub fn stale_files(
158 catalog_list: Arc<dyn CatalogProviderList>,
159 row_metadata_registry: Arc<RowMetadataRegistry>,
160 file_metadata: Arc<dyn TableProvider>,
161 config_options: &ConfigOptions,
162) -> Arc<dyn TableFunctionImpl + 'static> {
163 Arc::new(StaleFilesUdtf {
164 mv_dependencies: FileDependenciesUdtf {
165 catalog_list,
166 row_metadata_registry,
167 config_options: config_options.clone(),
168 },
169 file_metadata,
170 })
171}
172
173#[derive(Debug)]
174struct StaleFilesUdtf {
175 mv_dependencies: FileDependenciesUdtf,
176 file_metadata: Arc<dyn TableProvider>,
177}
178
179impl TableFunctionImpl for StaleFilesUdtf {
180 fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
181 use datafusion::prelude::*;
182 use datafusion_functions_aggregate::min_max::max;
183
184 let dependencies = provider_as_source(self.mv_dependencies.call(args)?);
185
186 let table_name = get_table_name(args)?;
187
188 let table_ref = TableReference::from(table_name).resolve(
189 &self.mv_dependencies.config_options.catalog.default_catalog,
190 &self.mv_dependencies.config_options.catalog.default_schema,
191 );
192
193 let table = util::get_table(self.mv_dependencies.catalog_list.as_ref(), &table_ref)
194 .map_err(|e| DataFusionError::Plan(e.to_string()))?;
195 let mv = cast_to_materialized(table.as_ref())?.ok_or(DataFusionError::Plan(format!(
196 "mv_dependencies: table '{table_name} is not a materialized view. (Materialized TableProviders must be registered using register_materialized"),
197 ))?;
198
199 let url = mv.table_paths()[0].to_string();
200 let num_static_partition_cols = mv.static_partition_columns().len();
201
202 let logical_plan =
203 LogicalPlanBuilder::scan_with_filters("dependencies", dependencies, None, vec![])?
204 .aggregate(
205 vec![col("dependencies.target").alias("expected_target")],
206 vec![max(col("source_last_modified")).alias("sources_last_modified")],
207 )?
208 .join(
209 LogicalPlanBuilder::scan_with_filters(
210 "file_metadata",
211 provider_as_source(
212 Arc::clone(&self.file_metadata) as Arc<dyn TableProvider>
213 ),
214 None,
215 vec![
216 col("table_catalog").eq(lit(table_ref.catalog.as_ref())),
217 col("table_schema").eq(lit(table_ref.schema.as_ref())),
218 col("table_name").eq(lit(table_ref.table.as_ref())),
219 ],
220 )?
221 .aggregate(
222 vec![
223 array_element(
230 regexp_match(
231 col("file_path"),
232 lit(format!(
233 "{url}(?:[^/=]+=[^/]+/){{{num_static_partition_cols}}}"
234 )),
235 None,
236 ),
237 lit(1),
238 )
239 .alias("existing_target"),
240 ],
241 vec![max(col("last_modified")).alias("target_last_modified")],
242 )?
243 .project(vec![col("existing_target"), col("target_last_modified")])?
244 .build()?,
245 JoinType::Left,
246 (vec!["expected_target"], vec!["existing_target"]),
247 None,
248 )?
249 .project(vec![
250 col("expected_target").alias("target"),
251 col("target_last_modified"),
252 col("sources_last_modified"),
253 nvl(
254 col("target_last_modified"),
255 lit(ScalarValue::TimestampNanosecond(
256 Some(0),
257 Some(Arc::from("UTC")),
258 )),
259 )
260 .lt(col("sources_last_modified"))
261 .alias("is_stale"),
262 ])?
263 .build()?;
264
265 Ok(Arc::new(ViewTable::new(logical_plan, None)))
266 }
267}
268
269fn get_table_name(args: &[Expr]) -> Result<&String> {
271 match &args[0] {
272 Expr::Literal(ScalarValue::Utf8(Some(table_name)), _) => Ok(table_name),
273 _ => Err(DataFusionError::Plan(
274 "expected a single string literal argument to mv_dependencies".to_string(),
275 )),
276 }
277}
278
279#[cfg_attr(doc, aquamarine::aquamarine)]
280pub fn mv_dependencies_plan(
360 materialized_view: &dyn Materialized,
361 row_metadata_registry: &RowMetadataRegistry,
362 config_options: &ConfigOptions,
363) -> Result<LogicalPlan> {
364 use datafusion_expr::logical_plan::*;
365
366 let plan = materialized_view.query().clone();
367
368 let static_partition_cols = materialized_view.static_partition_columns();
369 let static_partition_col_indices = plan
370 .schema()
371 .fields()
372 .iter()
373 .enumerate()
374 .filter_map(|(i, f)| static_partition_cols.contains(f.name()).then_some(i))
375 .collect();
376
377 let pruned_plan_with_source_files = if static_partition_cols.is_empty() {
378 get_source_files_all_partitions(
379 materialized_view,
380 &config_options.catalog,
381 row_metadata_registry,
382 )
383 } else {
384 let pruned_plan = pushdown_projection_inexact(plan, &static_partition_col_indices)?;
386
387 push_up_file_metadata(pruned_plan, &config_options.catalog, row_metadata_registry)
389 }?;
390
391 let files = pruned_plan_with_source_files
398 .schema()
399 .columns()
400 .into_iter()
401 .find(|c| c.name.starts_with(META_COLUMN))
402 .ok_or_else(|| DataFusionError::Plan(format!("Plan contains no {META_COLUMN} column")))?;
403 let files_col = Expr::Column(files.clone());
404
405 LogicalPlanBuilder::from(pruned_plan_with_source_files)
406 .unnest_column(files)?
407 .project(vec![
408 construct_target_path_from_static_partition_columns(materialized_view).alias("target"),
409 get_field(files_col.clone(), "table_catalog").alias("source_table_catalog"),
410 get_field(files_col.clone(), "table_schema").alias("source_table_schema"),
411 get_field(files_col.clone(), "table_name").alias("source_table_name"),
412 get_field(files_col.clone(), "source_uri").alias("source_uri"),
413 get_field(files_col.clone(), "last_modified").alias("source_last_modified"),
414 ])?
415 .distinct()?
416 .build()
417}
418
419fn construct_target_path_from_static_partition_columns(
420 materialized_view: &dyn Materialized,
421) -> Expr {
422 let table_path = lit(materialized_view.table_paths()[0]
423 .as_str()
424 .trim_end_matches("/"));
426 let mut hive_column_path_elements = materialized_view
428 .static_partition_columns()
429 .iter()
430 .map(|column_name| concat([lit(column_name.as_str()), lit("="), col(column_name)].to_vec()))
431 .collect::<Vec<_>>();
432 hive_column_path_elements.insert(0, table_path);
433
434 concat(vec![
435 if hive_column_path_elements.len() == 1 {
437 hive_column_path_elements.pop().unwrap()
438 } else {
439 concat_ws(lit("/"), hive_column_path_elements)
440 },
441 lit("/"),
443 ])
444}
445
446fn pushdown_projection_inexact(plan: LogicalPlan, indices: &HashSet<usize>) -> Result<LogicalPlan> {
454 use datafusion_expr::logical_plan::*;
455
456 let plan_formatted = format!("{}", plan.display());
457 match plan {
458 LogicalPlan::Projection(Projection { expr, input, .. }) => {
459 let new_exprs = expr
460 .into_iter()
461 .enumerate()
462 .filter_map(|(i, expr)| indices.contains(&i).then_some(expr))
463 .collect_vec();
464
465 let child_indices = new_exprs
466 .iter()
467 .flat_map(|e| e.column_refs().into_iter())
468 .map(|c| input.schema().index_of_column(c).unwrap())
469 .collect::<HashSet<_>>();
470
471 Projection::try_new(
472 new_exprs,
473 pushdown_projection_inexact(Arc::unwrap_or_clone(input), &child_indices)
474 .map(Arc::new)?,
475 )
476 .map(LogicalPlan::Projection)
477 }
478 LogicalPlan::Filter(ref filter) => {
479 let mut indices = indices.clone();
480
481 let new_filter = widen_filter(&filter.predicate, &mut indices, &plan)?;
482
483 let filter = match plan {
484 LogicalPlan::Filter(filter) => filter,
485 _ => unreachable!(),
486 };
487
488 Filter::try_new(
489 new_filter,
490 pushdown_projection_inexact(Arc::unwrap_or_clone(filter.input), &indices)
491 .map(Arc::new)?,
492 )
493 .map(LogicalPlan::Filter)
494 }
495 LogicalPlan::Window(Window {
496 input,
497 window_expr: _,
498 ..
499 }) => {
500 let num_non_window_cols = input.schema().fields().len();
504 if indices.iter().any(|&i| i >= num_non_window_cols) {
505 return internal_err!("Can't push down projection through window functions");
506 }
507
508 pushdown_projection_inexact(Arc::unwrap_or_clone(input), indices)
509 }
510 LogicalPlan::Aggregate(Aggregate {
511 input, group_expr, ..
512 }) => {
513 let num_group_exprs = group_expr.len();
515 if indices.iter().any(|&i| i >= num_group_exprs) {
516 return internal_err!("Can't push down projection through aggregate functions");
517 }
518
519 let new_exprs = group_expr
520 .into_iter()
521 .enumerate()
522 .filter_map(|(i, expr)| indices.contains(&i).then_some(expr))
523 .collect_vec();
524
525 let child_indices = new_exprs
526 .iter()
527 .flat_map(|e| e.column_refs().into_iter())
528 .map(|c| input.schema().index_of_column(c).unwrap())
529 .collect::<HashSet<_>>();
530
531 Projection::try_new(
532 new_exprs,
533 pushdown_projection_inexact(Arc::unwrap_or_clone(input), &child_indices)
534 .map(Arc::new)?,
535 )
536 .map(LogicalPlan::Projection)
537 }
538 LogicalPlan::Join(ref join) => {
539 let join_type = join.join_type;
540 match join_type {
541 JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {}
542 _ => {
543 return Err(DataFusionError::Internal(format!(
544 "unsupported join type: {join_type}"
545 )))
546 }
547 };
548
549 let mut indices = indices.clone();
550
551 let filter = join
554 .filter
555 .as_ref()
556 .map(|f| widen_filter(f, &mut indices, &plan))
557 .transpose()?;
558
559 let (mut left_child_indices, mut right_child_indices) =
560 indices.iter().partition_map(|&i| {
561 if i < join.left.schema().fields().len() {
562 Either::Left(i)
563 } else {
564 Either::Right(i - join.left.schema().fields().len())
565 }
566 });
567
568 let on = join.on.iter().try_fold(vec![], |mut v, (lexpr, rexpr)| {
569 if expr_is_relevant(lexpr, &left_child_indices, &join.left)?
573 || expr_is_relevant(rexpr, &right_child_indices, &join.right)?
574 {
575 add_all_columns_to_indices(lexpr, &mut left_child_indices, &join.left)?;
576 add_all_columns_to_indices(rexpr, &mut right_child_indices, &join.right)?;
577 v.push((lexpr.clone(), rexpr.clone()))
578 }
579
580 Ok::<_, DataFusionError>(v)
581 })?;
582
583 let join = match plan {
584 LogicalPlan::Join(join) => join,
585 _ => unreachable!(),
586 };
587
588 let left =
589 pushdown_projection_inexact(Arc::unwrap_or_clone(join.left), &left_child_indices)
590 .map(Arc::new)?;
591 let right =
592 pushdown_projection_inexact(Arc::unwrap_or_clone(join.right), &right_child_indices)
593 .map(Arc::new)?;
594
595 let schema = project_dfschema(join.schema.as_ref(), &indices).map(Arc::new)?;
596
597 Ok(LogicalPlan::Join(Join {
598 left,
599 right,
600 on,
601 filter,
602 join_type,
603 schema,
604 ..join
605 }))
606 }
607 LogicalPlan::Union(Union { inputs, schema, .. }) => {
608 let inputs = inputs
609 .into_iter()
610 .map(Arc::unwrap_or_clone)
611 .map(|plan| pushdown_projection_inexact(plan, indices))
612 .map_ok(Arc::new)
613 .collect::<Result<Vec<_>>>()?;
614
615 Ok(LogicalPlan::Union(Union {
616 inputs,
617 schema: project_dfschema(schema.as_ref(), indices).map(Arc::new)?,
618 }))
619 }
620 LogicalPlan::TableScan(ref scan) => {
621 let mut indices = indices.clone();
622 let filters = scan
623 .filters
624 .iter()
625 .map(|f| widen_filter(f, &mut indices, &plan))
626 .collect::<Result<Vec<_>>>()?;
627
628 let new_projection = scan
629 .projection
630 .clone()
631 .unwrap_or((0..scan.source.schema().fields().len()).collect())
632 .into_iter()
633 .enumerate()
634 .filter_map(|(i, j)| indices.contains(&i).then_some(j))
635 .collect_vec();
636
637 let scan = match plan {
638 LogicalPlan::TableScan(scan) => scan,
639 _ => unreachable!(),
640 };
641
642 TableScan::try_new(
643 scan.table_name,
644 scan.source,
645 Some(new_projection),
646 filters,
647 None,
648 )
649 .map(LogicalPlan::TableScan)
650 }
651 LogicalPlan::EmptyRelation(EmptyRelation {
652 produce_one_row,
653 schema,
654 }) => Ok(LogicalPlan::EmptyRelation(EmptyRelation {
655 produce_one_row,
656 schema: project_dfschema(schema.as_ref(), indices).map(Arc::new)?,
657 })),
658 LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => SubqueryAlias::try_new(
659 pushdown_projection_inexact(Arc::unwrap_or_clone(input), indices).map(Arc::new)?,
660 alias,
661 )
662 .map(LogicalPlan::SubqueryAlias),
663 LogicalPlan::Limit(Limit { input, .. }) | LogicalPlan::Sort(Sort { input, .. }) => {
664 pushdown_projection_inexact(Arc::unwrap_or_clone(input), indices)
666 }
667 LogicalPlan::Values(Values { schema, values }) => {
668 let schema = project_dfschema(&schema, indices).map(Arc::new)?;
669 let values = values
670 .into_iter()
671 .map(|row| {
672 row.into_iter()
673 .enumerate()
674 .filter_map(|(i, v)| indices.contains(&i).then_some(v))
675 .collect_vec()
676 })
677 .collect_vec();
678
679 Ok(LogicalPlan::Values(Values { schema, values }))
680 }
681 LogicalPlan::Distinct(Distinct::All(input)) => {
682 pushdown_projection_inexact(Arc::unwrap_or_clone(input), indices)
683 .map(Arc::new)
684 .map(Distinct::All)
685 .map(LogicalPlan::Distinct)
686 }
687 LogicalPlan::Unnest(unnest) => {
688 let child_indices = indices
692 .iter()
693 .map(|&i| unnest.dependency_indices[i])
694 .collect::<HashSet<_>>();
695
696 let input_using_columns = unnest.input.using_columns()?;
697 let input_schema = unnest.input.schema();
698 let columns_to_unnest =
699 unnest
700 .exec_columns
701 .into_iter()
702 .try_fold(vec![], |mut v, c| {
703 let c = c.normalize_with_schemas_and_ambiguity_check(
704 &[&[input_schema.as_ref()]],
705 &input_using_columns,
706 )?;
707 let idx = input_schema.index_of_column(&c)?;
708 if child_indices.contains(&idx) {
709 v.push(c);
710 }
711
712 Ok::<_, DataFusionError>(v)
713 })?;
714
715 let columns_to_project = unnest
716 .schema
717 .columns()
718 .into_iter()
719 .enumerate()
720 .filter_map(|(i, c)| indices.contains(&i).then_some(c))
721 .map(Expr::Column)
722 .collect_vec();
723
724 if columns_to_unnest.is_empty() {
731 return LogicalPlanBuilder::from(pushdown_projection_inexact(
732 Arc::unwrap_or_clone(unnest.input),
733 &child_indices,
734 )?)
735 .project(columns_to_project)?
736 .build();
737 }
738
739 LogicalPlanBuilder::from(pushdown_projection_inexact(
740 Arc::unwrap_or_clone(unnest.input),
741 &child_indices,
742 )?)
743 .unnest_columns_with_options(columns_to_unnest, unnest.options)?
744 .project(columns_to_project)?
745 .build()
746 }
747
748 _ => internal_err!("Unsupported logical plan node: {}", plan.display()),
749 }
750 .map_err(|e| e.context(format!("plan: \n{plan_formatted}")))
751}
752
753fn widen_filter(
786 predicate: &Expr,
787 indices: &mut HashSet<usize>,
788 parent: &LogicalPlan,
789) -> Result<Expr> {
790 let conjunctions = split_conjunction(predicate);
791
792 conjunctions.into_iter().try_fold(lit(true), |a, b| {
793 Ok(if expr_is_relevant(b, indices, parent)? {
794 add_all_columns_to_indices(b, indices, parent)?;
795 a.and(b.clone())
796 } else {
797 a
798 })
799 })
800}
801
802fn expr_is_relevant(expr: &Expr, indices: &HashSet<usize>, parent: &LogicalPlan) -> Result<bool> {
804 let schemas = parent
805 .inputs()
806 .iter()
807 .map(|input| input.schema().as_ref())
808 .collect_vec();
809 let using_columns = parent.using_columns()?;
810
811 for c in expr.column_refs() {
812 let normalized_column = c
813 .clone()
814 .normalize_with_schemas_and_ambiguity_check(&[&schemas], &using_columns)?;
815 let column_idx = parent.schema().index_of_column(&normalized_column)?;
816
817 if indices.contains(&column_idx) {
818 return Ok(true);
819 }
820 }
821
822 Ok(false)
823}
824
825fn add_all_columns_to_indices(
828 expr: &Expr,
829 indices: &mut HashSet<usize>,
830 parent: &LogicalPlan,
831) -> Result<()> {
832 let schemas = parent
833 .inputs()
834 .iter()
835 .map(|input| input.schema().as_ref())
836 .collect_vec();
837 let using_columns = parent.using_columns()?;
838
839 for c in expr.column_refs() {
840 let normalized_column = c
841 .clone()
842 .normalize_with_schemas_and_ambiguity_check(&[&schemas], &using_columns)?;
843 let column_idx = parent.schema().index_of_column(&normalized_column)?;
844
845 indices.insert(column_idx);
846 }
847
848 Ok(())
849}
850
851fn project_dfschema(schema: &DFSchema, indices: &HashSet<usize>) -> Result<DFSchema> {
852 let qualified_fields = (0..schema.fields().len())
853 .filter_map(|i| {
854 indices.contains(&i).then_some({
855 let (reference, field) = schema.qualified_field(i);
856 (reference.cloned(), Arc::new(field.clone()))
857 })
858 })
859 .collect_vec();
860
861 DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone())
863}
864
865fn push_up_file_metadata(
872 plan: LogicalPlan,
873 catalog_options: &CatalogOptions,
874 row_metadata_registry: &RowMetadataRegistry,
875) -> Result<LogicalPlan> {
876 let alias_generator = AliasGenerator::new();
877 plan.transform_up(|plan| {
878 match plan {
879 LogicalPlan::TableScan(scan) => {
880 scan_columns_from_row_metadata(scan, catalog_options, row_metadata_registry)
881 }
882 plan => project_row_metadata_from_input(plan, &alias_generator),
883 }
884 .and_then(LogicalPlan::recompute_schema)
885 .map(Transformed::yes)
886 })
887 .map(|t| t.data)
888}
889
890fn project_row_metadata_from_input(
894 plan: LogicalPlan,
895 alias_generator: &AliasGenerator,
896) -> Result<LogicalPlan> {
897 use datafusion_expr::logical_plan::*;
898
899 match plan {
901 LogicalPlan::Projection(Projection { expr, input, .. }) => {
902 let file_md_columns = input
903 .schema()
904 .columns()
905 .into_iter()
906 .filter_map(|c| c.name.starts_with(META_COLUMN).then_some(Expr::Column(c)))
907 .collect_vec();
908 Projection::try_new(
909 expr.into_iter()
910 .chain(Some(
911 flatten(make_array(file_md_columns))
912 .alias(alias_generator.next(META_COLUMN)),
913 ))
914 .collect_vec(),
915 input,
916 )
917 .map(LogicalPlan::Projection)
918 }
919 _ => {
920 let plan = plan.recompute_schema()?;
921 let (file_md_columns, original_columns) = plan
922 .schema()
923 .columns()
924 .into_iter()
925 .partition::<Vec<_>, _>(|c| c.name.starts_with(META_COLUMN));
926
927 Projection::try_new(
928 original_columns
929 .into_iter()
930 .map(Expr::Column)
931 .chain(Some(
932 flatten(make_array(
933 file_md_columns.into_iter().map(Expr::Column).collect_vec(),
934 ))
935 .alias(alias_generator.next(META_COLUMN)),
936 ))
937 .collect_vec(),
938 Arc::new(plan),
939 )
940 .map(LogicalPlan::Projection)
941 }
942 }
943}
944
945fn scan_columns_from_row_metadata(
950 scan: TableScan,
951 catalog_options: &CatalogOptions,
952 row_metadata_registry: &RowMetadataRegistry,
953) -> Result<LogicalPlan> {
954 let table_ref = scan.table_name.clone().resolve(
955 &catalog_options.default_catalog,
956 &catalog_options.default_schema,
957 );
958
959 let source = row_metadata_registry.get_source(&table_ref)?;
960
961 let mut exprs = scan
964 .projected_schema
965 .fields()
966 .iter()
967 .map(|f| col((None, f)))
968 .collect_vec();
969 exprs.push(make_array(vec![col(META_COLUMN)]).alias(META_COLUMN));
970
971 source
972 .row_metadata(table_ref, &scan)?
973 .project(exprs)?
974 .alias(scan.table_name.clone())?
975 .filter(
976 scan.filters
977 .clone()
978 .into_iter()
979 .fold(lit(true), |a, b| a.and(b)),
980 )?
981 .build()
982}
983
984fn get_source_files_all_partitions(
994 materialized_view: &dyn Materialized,
995 catalog_options: &CatalogOptions,
996 row_metadata_registry: &RowMetadataRegistry,
997) -> Result<LogicalPlan> {
998 use datafusion_common::tree_node::TreeNodeRecursion;
999
1000 let mut tables = std::collections::HashMap::<TableReference, _>::new();
1001
1002 materialized_view
1003 .query()
1004 .apply(|plan| {
1005 if let LogicalPlan::TableScan(scan) = plan {
1006 tables.insert(scan.table_name.clone(), Arc::clone(&scan.source));
1007 }
1008
1009 Ok(TreeNodeRecursion::Continue)
1010 })
1011 .unwrap();
1012
1013 tables
1014 .into_iter()
1015 .try_fold(
1016 None::<LogicalPlanBuilder>,
1017 |maybe_plan, (table_ref, source)| {
1018 let resolved_ref = table_ref.clone().resolve(
1019 &catalog_options.default_catalog,
1020 &catalog_options.default_schema,
1021 );
1022
1023 let row_metadata = row_metadata_registry.get_source(&resolved_ref)?;
1024 let row_metadata_scan = row_metadata
1025 .row_metadata(
1026 resolved_ref,
1027 &TableScan {
1028 table_name: table_ref.clone(),
1029 source,
1030 projection: Some(vec![]), projected_schema: Arc::new(DFSchema::empty()),
1032 filters: vec![],
1033 fetch: None,
1034 },
1035 )?
1036 .build()?;
1037
1038 if let Some(previous) = maybe_plan {
1039 previous.union(row_metadata_scan)
1040 } else {
1041 Ok(LogicalPlanBuilder::from(row_metadata_scan))
1042 }
1043 .map(Some)
1044 },
1045 )?
1046 .ok_or_else(|| DataFusionError::Plan("materialized view has no source tables".into()))?
1047 .project(vec![make_array(vec![col(META_COLUMN)]).alias(META_COLUMN)])?
1050 .build()
1051}
1052
1053#[cfg(test)]
1054mod test {
1055 use std::{any::Any, collections::HashSet, sync::Arc};
1056
1057 use arrow::util::pretty::pretty_format_batches;
1058 use arrow_schema::{DataType, Field, FieldRef, Fields, SchemaRef};
1059 use datafusion::{
1060 assert_batches_eq, assert_batches_sorted_eq,
1061 catalog::{Session, TableProvider},
1062 datasource::listing::ListingTableUrl,
1063 execution::session_state::SessionStateBuilder,
1064 prelude::{DataFrame, SessionConfig, SessionContext},
1065 };
1066 use datafusion_common::{Column, DFSchema, Result, ScalarValue};
1067 use datafusion_expr::builder::unnest;
1068 use datafusion_expr::{EmptyRelation, Expr, JoinType, LogicalPlan, TableType};
1069 use datafusion_physical_plan::ExecutionPlan;
1070 use itertools::Itertools;
1071
1072 use crate::materialized::{
1073 dependencies::pushdown_projection_inexact,
1074 register_decorator, register_materialized,
1075 row_metadata::{ObjectStoreRowMetadataSource, RowMetadataRegistry},
1076 Decorator, ListingTableLike, Materialized,
1077 };
1078
1079 use super::{mv_dependencies, stale_files};
1080
1081 #[derive(Debug)]
1083 struct MockMaterializedView {
1084 table_path: ListingTableUrl,
1085 partition_columns: Vec<String>,
1086 static_partition_columns: Option<Vec<String>>, query: LogicalPlan,
1088 file_ext: &'static str,
1089 }
1090
1091 #[async_trait::async_trait]
1092 impl TableProvider for MockMaterializedView {
1093 fn as_any(&self) -> &dyn Any {
1094 self
1095 }
1096
1097 fn schema(&self) -> SchemaRef {
1098 Arc::new(self.query.schema().as_arrow().clone())
1099 }
1100
1101 fn table_type(&self) -> TableType {
1102 TableType::Base
1103 }
1104
1105 async fn scan(
1106 &self,
1107 _state: &dyn Session,
1108 _projection: Option<&Vec<usize>>,
1109 _filters: &[Expr],
1110 _limit: Option<usize>,
1111 ) -> Result<Arc<dyn ExecutionPlan>> {
1112 unimplemented!()
1113 }
1114 }
1115
1116 impl ListingTableLike for MockMaterializedView {
1117 fn table_paths(&self) -> Vec<ListingTableUrl> {
1118 vec![self.table_path.clone()]
1119 }
1120
1121 fn partition_columns(&self) -> Vec<String> {
1122 self.partition_columns.clone()
1123 }
1124
1125 fn file_ext(&self) -> String {
1126 self.file_ext.to_string()
1127 }
1128 }
1129
1130 impl Materialized for MockMaterializedView {
1131 fn query(&self) -> LogicalPlan {
1132 self.query.clone()
1133 }
1134
1135 fn static_partition_columns(&self) -> Vec<String> {
1136 self.static_partition_columns
1137 .clone()
1138 .unwrap_or_else(|| self.partition_columns.clone())
1139 }
1140 }
1141
1142 #[derive(Debug)]
1143 struct DecoratorTable {
1144 inner: Arc<dyn TableProvider>,
1145 }
1146
1147 #[async_trait::async_trait]
1148 impl TableProvider for DecoratorTable {
1149 fn as_any(&self) -> &dyn Any {
1150 self
1151 }
1152
1153 fn schema(&self) -> SchemaRef {
1154 self.inner.schema()
1155 }
1156
1157 fn table_type(&self) -> TableType {
1158 self.inner.table_type()
1159 }
1160
1161 async fn scan(
1162 &self,
1163 state: &dyn Session,
1164 projection: Option<&Vec<usize>>,
1165 filters: &[Expr],
1166 limit: Option<usize>,
1167 ) -> Result<Arc<dyn ExecutionPlan>> {
1168 self.inner.scan(state, projection, filters, limit).await
1169 }
1170 }
1171
1172 impl Decorator for DecoratorTable {
1173 fn base(&self) -> &dyn TableProvider {
1174 self.inner.as_ref()
1175 }
1176 }
1177
1178 async fn setup() -> Result<SessionContext> {
1179 let _ = env_logger::builder().is_test(true).try_init();
1180
1181 register_materialized::<MockMaterializedView>();
1182 register_decorator::<DecoratorTable>();
1183
1184 let state = SessionStateBuilder::new()
1185 .with_default_features()
1186 .with_config(
1187 SessionConfig::new()
1188 .with_default_catalog_and_schema("datafusion", "test")
1189 .set(
1190 "datafusion.explain.logical_plan_only",
1191 &ScalarValue::Boolean(Some(true)),
1192 )
1193 .set(
1194 "datafusion.sql_parser.dialect",
1195 &ScalarValue::Utf8(Some("duckdb".into())),
1196 )
1197 .set(
1198 "datafusion.execution.skip_physical_aggregate_schema_check",
1201 &ScalarValue::Boolean(Some(true)),
1202 ),
1203 )
1204 .build();
1205
1206 let ctx = SessionContext::new_with_state(state);
1207
1208 ctx.sql(
1209 "CREATE TABLE t1 AS VALUES
1210 ('2021', 3, 'A'),
1211 ('2022', 4, 'B'),
1212 ('2023', 5, 'C')",
1213 )
1214 .await?
1215 .collect()
1216 .await?;
1217
1218 ctx.sql(
1219 "CREATE TABLE t2 (
1220 year STRING,
1221 month STRING,
1222 day STRING,
1223 feed CHAR,
1224 column2 INTEGER
1225 ) AS VALUES
1226 ('2023', '01', '01', 'A', 1),
1227 ('2023', '01', '02', 'B', 2),
1228 ('2023', '01', '03', 'C', 3),
1229 ('2024', '12', '04', 'X', 4),
1230 ('2024', '12', '05', 'Y', 5),
1231 ('2024', '12', '06', 'Z', 6)",
1232 )
1233 .await?
1234 .collect()
1235 .await?;
1236
1237 ctx.sql(
1238 "CREATE TABLE t3 (
1239 year STRING,
1240 column1 INTEGER
1241 ) AS VALUES
1242 (2023, 1),
1243 (2024, 2)",
1244 )
1245 .await?
1246 .collect()
1247 .await?;
1248
1249 ctx.sql(
1250 "CREATE TABLE file_metadata (
1252 table_catalog STRING,
1253 table_schema STRING,
1254 table_name STRING,
1255 file_path STRING,
1256 last_modified TIMESTAMP,
1257 size BIGINT UNSIGNED
1258 ) AS VALUES
1259 ('datafusion', 'test', 't1', 's3://t1/column1=2021/data.01.parquet', '2023-07-11T16:29:26Z', 0),
1260 ('datafusion', 'test', 't1', 's3://t1/column1=2022/data.01.parquet', '2023-07-11T16:45:22Z', 0),
1261 ('datafusion', 'test', 't1', 's3://t1/column1=2023/data.01.parquet', '2023-07-11T16:45:44Z', 0),
1262 ('datafusion', 'test', 't2', 's3://t2/year=2023/month=01/day=01/feed=A/data.01.parquet', '2023-07-11T16:29:26Z', 0),
1263 ('datafusion', 'test', 't2', 's3://t2/year=2023/month=01/day=02/feed=B/data.01.parquet', '2023-07-11T16:45:22Z', 0),
1264 ('datafusion', 'test', 't2', 's3://t2/year=2023/month=01/day=03/feed=C/data.01.parquet', '2023-07-11T16:45:44Z', 0),
1265 ('datafusion', 'test', 't2', 's3://t2/year=2024/month=12/day=04/feed=X/data.01.parquet', '2023-07-11T16:29:26Z', 0),
1266 ('datafusion', 'test', 't2', 's3://t2/year=2024/month=12/day=05/feed=Y/data.01.parquet', '2023-07-11T16:45:22Z', 0),
1267 ('datafusion', 'test', 't2', 's3://t2/year=2024/month=12/day=06/feed=Z/data.01.parquet', '2023-07-11T16:45:44Z', 0),
1268 ('datafusion', 'test', 't3', 's3://t3/year=2023/data.01.parquet', '2023-07-11T16:45:44Z', 0),
1269 ('datafusion', 'test', 't3', 's3://t3/year=2024/data.01.parquet', '2023-07-11T16:45:44Z', 0)
1270 "
1271 )
1272 .await?
1273 .collect()
1274 .await?;
1275
1276 let metadata_table = ctx.table_provider("file_metadata").await?;
1277 let object_store_metadata_source = Arc::new(
1278 ObjectStoreRowMetadataSource::with_file_metadata(Arc::clone(&metadata_table)),
1279 );
1280
1281 let row_metadata_registry = Arc::new(RowMetadataRegistry::new_with_default_source(
1282 object_store_metadata_source,
1283 ));
1284
1285 ctx.register_udtf(
1286 "mv_dependencies",
1287 mv_dependencies(
1288 Arc::clone(ctx.state().catalog_list()),
1289 row_metadata_registry.clone(),
1290 ctx.copied_config().options(),
1291 ),
1292 );
1293
1294 ctx.register_udtf(
1295 "stale_files",
1296 stale_files(
1297 Arc::clone(ctx.state().catalog_list()),
1298 Arc::clone(&row_metadata_registry),
1299 metadata_table,
1300 ctx.copied_config().options(),
1301 ),
1302 );
1303
1304 Ok(ctx)
1305 }
1306
1307 #[tokio::test]
1308 async fn test_deps() {
1309 #[derive(Debug, Default)]
1310 struct TestCase {
1311 name: &'static str,
1312 query_to_analyze: &'static str,
1313 table_name: &'static str,
1314 table_path: &'static str,
1315 partition_cols: Vec<&'static str>,
1316 static_partition_cols: Option<Vec<&'static str>>,
1317 file_extension: &'static str,
1318 expected_output: Vec<&'static str>,
1319 file_metadata: &'static str,
1320 expected_stale_files_output: Vec<&'static str>,
1321 }
1322
1323 let cases = &[
1324 TestCase { name: "un-transformed partition column",
1325 query_to_analyze:
1326 "SELECT column1 AS partition_column, concat(column2, column3) AS some_value FROM t1",
1327 table_name: "m1",
1328 table_path: "s3://m1/",
1329 partition_cols: vec!["partition_column"],
1330 file_extension: ".parquet",
1331 expected_output: vec![
1332 "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+",
1333 "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |",
1334 "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+",
1335 "| s3://m1/partition_column=2021/ | datafusion | test | t1 | s3://t1/column1=2021/data.01.parquet | 2023-07-11T16:29:26 |",
1336 "| s3://m1/partition_column=2022/ | datafusion | test | t1 | s3://t1/column1=2022/data.01.parquet | 2023-07-11T16:45:22 |",
1337 "| s3://m1/partition_column=2023/ | datafusion | test | t1 | s3://t1/column1=2023/data.01.parquet | 2023-07-11T16:45:44 |",
1338 "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+",
1339 ],
1340 file_metadata: "
1342 ('datafusion', 'test', 'm1', 's3://m1/partition_column=2021/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1343 ('datafusion', 'test', 'm1', 's3://m1/partition_column=2022/data.01.parquet', '2023-07-10T16:00:00Z', 0),
1344 ('datafusion', 'test', 'm1', 's3://m1/partition_column=2023/data.01.parquet', '2023-07-12T16:00:00Z', 0)
1345 ",
1346 expected_stale_files_output: vec![
1347 "+--------------------------------+----------------------+-----------------------+----------+",
1348 "| target | target_last_modified | sources_last_modified | is_stale |",
1349 "+--------------------------------+----------------------+-----------------------+----------+",
1350 "| s3://m1/partition_column=2021/ | 2023-07-12T16:00:00 | 2023-07-11T16:29:26 | false |",
1351 "| s3://m1/partition_column=2022/ | 2023-07-10T16:00:00 | 2023-07-11T16:45:22 | true |",
1352 "| s3://m1/partition_column=2023/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |",
1353 "+--------------------------------+----------------------+-----------------------+----------+",
1354 ],
1355 ..Default::default()
1356 },
1357 TestCase { name: "omit internal metadata partition columns",
1358 query_to_analyze:
1359 "SELECT column1 AS partition_column, concat(column2, column3) AS some_value FROM t1",
1360 table_name: "m1",
1361 table_path: "s3://m1/",
1362 partition_cols: vec!["partition_column"],
1363 file_extension: ".parquet",
1364 expected_output: vec![
1365 "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+",
1366 "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |",
1367 "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+",
1368 "| s3://m1/partition_column=2021/ | datafusion | test | t1 | s3://t1/column1=2021/data.01.parquet | 2023-07-11T16:29:26 |",
1369 "| s3://m1/partition_column=2022/ | datafusion | test | t1 | s3://t1/column1=2022/data.01.parquet | 2023-07-11T16:45:22 |",
1370 "| s3://m1/partition_column=2023/ | datafusion | test | t1 | s3://t1/column1=2023/data.01.parquet | 2023-07-11T16:45:44 |",
1371 "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+",
1372 ],
1373 file_metadata: "
1375 ('datafusion', 'test', 'm1', 's3://m1/partition_column=2021/_v=123/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1376 ('datafusion', 'test', 'm1', 's3://m1/partition_column=2022/_v=123/data.01.parquet', '2023-07-10T16:00:00Z', 0),
1377 ('datafusion', 'test', 'm1', 's3://m1/partition_column=2023/_v=123/data.01.parquet', '2023-07-12T16:00:00Z', 0)
1378 ",
1379 expected_stale_files_output: vec![
1380 "+--------------------------------+----------------------+-----------------------+----------+",
1381 "| target | target_last_modified | sources_last_modified | is_stale |",
1382 "+--------------------------------+----------------------+-----------------------+----------+",
1383 "| s3://m1/partition_column=2021/ | 2023-07-12T16:00:00 | 2023-07-11T16:29:26 | false |",
1384 "| s3://m1/partition_column=2022/ | 2023-07-10T16:00:00 | 2023-07-11T16:45:22 | true |",
1385 "| s3://m1/partition_column=2023/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |",
1386 "+--------------------------------+----------------------+-----------------------+----------+",
1387 ],
1388 ..Default::default()
1389 },
1390 TestCase {
1391 name: "transform year/month/day partition into timestamp partition",
1392 query_to_analyze: "
1393 SELECT DISTINCT
1394 to_timestamp_nanos(concat_ws('-', year, month, day)) AS timestamp,
1395 feed
1396 FROM t2",
1397 table_name: "m2",
1398 table_path: "s3://m2/",
1399 partition_cols: vec!["timestamp", "feed"],
1400 file_extension: ".parquet",
1401 expected_output: vec![
1402 "+-----------------------------------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1403 "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |",
1404 "+-----------------------------------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1405 "| s3://m2/timestamp=2023-01-01T00:00:00/feed=A/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=01/feed=A/data.01.parquet | 2023-07-11T16:29:26 |",
1406 "| s3://m2/timestamp=2023-01-02T00:00:00/feed=B/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=02/feed=B/data.01.parquet | 2023-07-11T16:45:22 |",
1407 "| s3://m2/timestamp=2023-01-03T00:00:00/feed=C/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=03/feed=C/data.01.parquet | 2023-07-11T16:45:44 |",
1408 "| s3://m2/timestamp=2024-12-04T00:00:00/feed=X/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=04/feed=X/data.01.parquet | 2023-07-11T16:29:26 |",
1409 "| s3://m2/timestamp=2024-12-05T00:00:00/feed=Y/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=05/feed=Y/data.01.parquet | 2023-07-11T16:45:22 |",
1410 "| s3://m2/timestamp=2024-12-06T00:00:00/feed=Z/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=06/feed=Z/data.01.parquet | 2023-07-11T16:45:44 |",
1411 "+-----------------------------------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1412 ],
1413 file_metadata: "
1414 ('datafusion', 'test', 'm2', 's3://m2/timestamp=2023-01-01T00:00:00/feed=A/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1415 ('datafusion', 'test', 'm2', 's3://m2/timestamp=2023-01-02T00:00:00/feed=B/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1416 ('datafusion', 'test', 'm2', 's3://m2/timestamp=2023-01-03T00:00:00/feed=C/data.01.parquet', '2023-07-10T16:00:00Z', 0),
1417 ('datafusion', 'test', 'm2', 's3://m2/timestamp=2024-12-04T00:00:00/feed=X/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1418 ('datafusion', 'test', 'm2', 's3://m2/timestamp=2024-12-05T00:00:00/feed=Y/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1419 ('datafusion', 'test', 'm2', 's3://m2/timestamp=2024-12-06T00:00:00/feed=Z/data.01.parquet', '2023-07-10T16:00:00Z', 0)
1420 ",
1421 expected_stale_files_output: vec![
1422 "+-----------------------------------------------+----------------------+-----------------------+----------+",
1423 "| target | target_last_modified | sources_last_modified | is_stale |",
1424 "+-----------------------------------------------+----------------------+-----------------------+----------+",
1425 "| s3://m2/timestamp=2023-01-01T00:00:00/feed=A/ | 2023-07-12T16:00:00 | 2023-07-11T16:29:26 | false |",
1426 "| s3://m2/timestamp=2023-01-02T00:00:00/feed=B/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:22 | false |",
1427 "| s3://m2/timestamp=2023-01-03T00:00:00/feed=C/ | 2023-07-10T16:00:00 | 2023-07-11T16:45:44 | true |",
1428 "| s3://m2/timestamp=2024-12-04T00:00:00/feed=X/ | 2023-07-12T16:00:00 | 2023-07-11T16:29:26 | false |",
1429 "| s3://m2/timestamp=2024-12-05T00:00:00/feed=Y/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:22 | false |",
1430 "| s3://m2/timestamp=2024-12-06T00:00:00/feed=Z/ | 2023-07-10T16:00:00 | 2023-07-11T16:45:44 | true |",
1431 "+-----------------------------------------------+----------------------+-----------------------+----------+",
1432 ],
1433 ..Default::default()
1434 },
1435 TestCase {
1436 name: "omit dynamic partition columns",
1437 query_to_analyze: "
1438 SELECT
1439 year,
1440 month,
1441 day,
1442 column2,
1443 COUNT(*) AS ct
1444 FROM t2
1445 GROUP BY year, month, day, column2
1446 ",
1447 table_name: "m_dynamic",
1448 table_path: "s3://m_dynamic/",
1449 partition_cols: vec!["year", "month", "day", "column2"],
1450 static_partition_cols: Some(vec!["year", "month", "day"]),
1451 file_extension: ".parquet",
1452 expected_output: vec![
1453 "+-------------------------------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1454 "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |",
1455 "+-------------------------------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1456 "| s3://m_dynamic/year=2023/month=01/day=01/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=01/feed=A/data.01.parquet | 2023-07-11T16:29:26 |",
1457 "| s3://m_dynamic/year=2023/month=01/day=02/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=02/feed=B/data.01.parquet | 2023-07-11T16:45:22 |",
1458 "| s3://m_dynamic/year=2023/month=01/day=03/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=03/feed=C/data.01.parquet | 2023-07-11T16:45:44 |",
1459 "| s3://m_dynamic/year=2024/month=12/day=04/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=04/feed=X/data.01.parquet | 2023-07-11T16:29:26 |",
1460 "| s3://m_dynamic/year=2024/month=12/day=05/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=05/feed=Y/data.01.parquet | 2023-07-11T16:45:22 |",
1461 "| s3://m_dynamic/year=2024/month=12/day=06/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=06/feed=Z/data.01.parquet | 2023-07-11T16:45:44 |",
1462 "+-------------------------------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1463 ],
1464 file_metadata: "
1465 ('datafusion', 'test', 'm_dynamic', 's3://m_dynamic/year=2023/month=01/day=01/column2=1/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1466 ('datafusion', 'test', 'm_dynamic', 's3://m_dynamic/year=2023/month=01/day=02/column2=2/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1467 ('datafusion', 'test', 'm_dynamic', 's3://m_dynamic/year=2023/month=01/day=03/column2=3/data.01.parquet', '2023-07-10T16:00:00Z', 0),
1468 ('datafusion', 'test', 'm_dynamic', 's3://m_dynamic/year=2024/month=12/day=04/column2=4/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1469 ('datafusion', 'test', 'm_dynamic', 's3://m_dynamic/year=2024/month=12/day=05/column2=5/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1470 ('datafusion', 'test', 'm_dynamic', 's3://m_dynamic/year=2024/month=12/day=06/column2=6/data.01.parquet', '2023-07-10T16:00:00Z', 0)
1471 ",
1472 expected_stale_files_output: vec![
1473 "+-------------------------------------------+----------------------+-----------------------+----------+",
1474 "| target | target_last_modified | sources_last_modified | is_stale |",
1475 "+-------------------------------------------+----------------------+-----------------------+----------+",
1476 "| s3://m_dynamic/year=2023/month=01/day=01/ | 2023-07-12T16:00:00 | 2023-07-11T16:29:26 | false |",
1477 "| s3://m_dynamic/year=2023/month=01/day=02/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:22 | false |",
1478 "| s3://m_dynamic/year=2023/month=01/day=03/ | 2023-07-10T16:00:00 | 2023-07-11T16:45:44 | true |",
1479 "| s3://m_dynamic/year=2024/month=12/day=04/ | 2023-07-12T16:00:00 | 2023-07-11T16:29:26 | false |",
1480 "| s3://m_dynamic/year=2024/month=12/day=05/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:22 | false |",
1481 "| s3://m_dynamic/year=2024/month=12/day=06/ | 2023-07-10T16:00:00 | 2023-07-11T16:45:44 | true |",
1482 "+-------------------------------------------+----------------------+-----------------------+----------+",
1483 ],
1484 },
1485 TestCase {
1486 name: "materialized view has no partitions",
1487 query_to_analyze: "SELECT column1 AS output FROM t3",
1488 table_name: "m3",
1489 table_path: "s3://m3/",
1490 partition_cols: vec![],
1491 file_extension: ".parquet",
1492 expected_output: vec![
1493 "+----------+----------------------+---------------------+-------------------+-----------------------------------+----------------------+",
1494 "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |",
1495 "+----------+----------------------+---------------------+-------------------+-----------------------------------+----------------------+",
1496 "| s3://m3/ | datafusion | test | t3 | s3://t3/year=2023/data.01.parquet | 2023-07-11T16:45:44 |",
1497 "| s3://m3/ | datafusion | test | t3 | s3://t3/year=2024/data.01.parquet | 2023-07-11T16:45:44 |",
1498 "+----------+----------------------+---------------------+-------------------+-----------------------------------+----------------------+",
1499 ],
1500 file_metadata: "
1501 ('datafusion', 'test', 'm3', 's3://m3/data.01.parquet', '2023-07-12T16:00:00Z', 0)
1502 ",
1503 expected_stale_files_output: vec![
1504 "+----------+----------------------+-----------------------+----------+",
1505 "| target | target_last_modified | sources_last_modified | is_stale |",
1506 "+----------+----------------------+-----------------------+----------+",
1507 "| s3://m3/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |",
1508 "+----------+----------------------+-----------------------+----------+",
1509 ],
1510 ..Default::default()
1511 },
1512 TestCase {
1513 name: "simple equijoin on year",
1514 query_to_analyze: "SELECT * FROM t2 INNER JOIN t3 USING (year)",
1515 table_name: "m4",
1516 table_path: "s3://m4/",
1517 partition_cols: vec!["year"],
1518 file_extension: ".parquet",
1519 expected_output: vec![
1520 "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1521 "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |",
1522 "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1523 "| s3://m4/year=2023/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=01/feed=A/data.01.parquet | 2023-07-11T16:29:26 |",
1524 "| s3://m4/year=2023/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=02/feed=B/data.01.parquet | 2023-07-11T16:45:22 |",
1525 "| s3://m4/year=2023/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=03/feed=C/data.01.parquet | 2023-07-11T16:45:44 |",
1526 "| s3://m4/year=2023/ | datafusion | test | t3 | s3://t3/year=2023/data.01.parquet | 2023-07-11T16:45:44 |",
1527 "| s3://m4/year=2024/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=04/feed=X/data.01.parquet | 2023-07-11T16:29:26 |",
1528 "| s3://m4/year=2024/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=05/feed=Y/data.01.parquet | 2023-07-11T16:45:22 |",
1529 "| s3://m4/year=2024/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=06/feed=Z/data.01.parquet | 2023-07-11T16:45:44 |",
1530 "| s3://m4/year=2024/ | datafusion | test | t3 | s3://t3/year=2024/data.01.parquet | 2023-07-11T16:45:44 |",
1531 "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1532 ],
1533 file_metadata: "
1534 ('datafusion', 'test', 'm4', 's3://m4/year=2023/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1535 ('datafusion', 'test', 'm4', 's3://m4/year=2024/data.01.parquet', '2023-07-12T16:00:00Z', 0)
1536 ",
1537 expected_stale_files_output: vec![
1538 "+--------------------+----------------------+-----------------------+----------+",
1539 "| target | target_last_modified | sources_last_modified | is_stale |",
1540 "+--------------------+----------------------+-----------------------+----------+",
1541 "| s3://m4/year=2023/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |",
1542 "| s3://m4/year=2024/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |",
1543 "+--------------------+----------------------+-----------------------+----------+",
1544 ],
1545 ..Default::default()
1546 },
1547 TestCase {
1548 name: "triangular join on year",
1549 query_to_analyze: "
1550 SELECT
1551 t2.*,
1552 t3.* EXCLUDE(year),
1553 t3.year AS \"t3.year\"
1554 FROM t2
1555 INNER JOIN t3
1556 ON (t2.year <= t3.year)",
1557 table_name: "m4",
1558 table_path: "s3://m4/",
1559 partition_cols: vec!["year"],
1560 file_extension: ".parquet",
1561 expected_output: vec![
1562 "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1563 "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |",
1564 "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1565 "| s3://m4/year=2023/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=01/feed=A/data.01.parquet | 2023-07-11T16:29:26 |",
1566 "| s3://m4/year=2023/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=02/feed=B/data.01.parquet | 2023-07-11T16:45:22 |",
1567 "| s3://m4/year=2023/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=03/feed=C/data.01.parquet | 2023-07-11T16:45:44 |",
1568 "| s3://m4/year=2023/ | datafusion | test | t3 | s3://t3/year=2023/data.01.parquet | 2023-07-11T16:45:44 |",
1569 "| s3://m4/year=2023/ | datafusion | test | t3 | s3://t3/year=2024/data.01.parquet | 2023-07-11T16:45:44 |",
1570 "| s3://m4/year=2024/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=04/feed=X/data.01.parquet | 2023-07-11T16:29:26 |",
1571 "| s3://m4/year=2024/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=05/feed=Y/data.01.parquet | 2023-07-11T16:45:22 |",
1572 "| s3://m4/year=2024/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=06/feed=Z/data.01.parquet | 2023-07-11T16:45:44 |",
1573 "| s3://m4/year=2024/ | datafusion | test | t3 | s3://t3/year=2024/data.01.parquet | 2023-07-11T16:45:44 |",
1574 "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1575 ],
1576 file_metadata: "
1577 ('datafusion', 'test', 'm4', 's3://m4/year=2023/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1578 ('datafusion', 'test', 'm4', 's3://m4/year=2024/data.01.parquet', '2023-07-12T16:00:00Z', 0)
1579 ",
1580 expected_stale_files_output: vec![
1581 "+--------------------+----------------------+-----------------------+----------+",
1582 "| target | target_last_modified | sources_last_modified | is_stale |",
1583 "+--------------------+----------------------+-----------------------+----------+",
1584 "| s3://m4/year=2023/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |",
1585 "| s3://m4/year=2024/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |",
1586 "+--------------------+----------------------+-----------------------+----------+",
1587 ],
1588 ..Default::default()
1589 },
1590 TestCase {
1591 name: "triangular left join, strict <",
1592 query_to_analyze: "
1593 SELECT
1594 t2.*,
1595 t3.* EXCLUDE(year),
1596 t3.year AS \"t3.year\"
1597 FROM t2
1598 LEFT JOIN t3
1599 ON (t2.year < t3.year)",
1600 table_name: "m4",
1601 table_path: "s3://m4/",
1602 partition_cols: vec!["year"],
1603 file_extension: ".parquet",
1604 expected_output: vec![
1605 "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1606 "| target | source_table_catalog | source_table_schema | source_table_name | source_uri | source_last_modified |",
1607 "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1608 "| s3://m4/year=2023/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=01/feed=A/data.01.parquet | 2023-07-11T16:29:26 |",
1609 "| s3://m4/year=2023/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=02/feed=B/data.01.parquet | 2023-07-11T16:45:22 |",
1610 "| s3://m4/year=2023/ | datafusion | test | t2 | s3://t2/year=2023/month=01/day=03/feed=C/data.01.parquet | 2023-07-11T16:45:44 |",
1611 "| s3://m4/year=2023/ | datafusion | test | t3 | s3://t3/year=2024/data.01.parquet | 2023-07-11T16:45:44 |",
1612 "| s3://m4/year=2024/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=04/feed=X/data.01.parquet | 2023-07-11T16:29:26 |",
1613 "| s3://m4/year=2024/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=05/feed=Y/data.01.parquet | 2023-07-11T16:45:22 |",
1614 "| s3://m4/year=2024/ | datafusion | test | t2 | s3://t2/year=2024/month=12/day=06/feed=Z/data.01.parquet | 2023-07-11T16:45:44 |",
1615 "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1616 ],
1617 file_metadata: "
1618 ('datafusion', 'test', 'm4', 's3://m4/year=2023/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1619 ('datafusion', 'test', 'm4', 's3://m4/year=2024/data.01.parquet', '2023-07-12T16:00:00Z', 0)
1620 ",
1621 expected_stale_files_output: vec![
1622 "+--------------------+----------------------+-----------------------+----------+",
1623 "| target | target_last_modified | sources_last_modified | is_stale |",
1624 "+--------------------+----------------------+-----------------------+----------+",
1625 "| s3://m4/year=2023/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |",
1626 "| s3://m4/year=2024/ | 2023-07-12T16:00:00 | 2023-07-11T16:45:44 | false |",
1627 "+--------------------+----------------------+-----------------------+----------+",
1628 ],
1629 ..Default::default()
1630 },
1631 ];
1632
1633 async fn run_test(case: &TestCase) -> Result<()> {
1634 let context = setup().await.unwrap();
1635
1636 let plan = context
1637 .sql(case.query_to_analyze)
1638 .await?
1639 .into_optimized_plan()?;
1640
1641 println!("original plan: \n{}", plan.display_indent());
1642
1643 let partition_col_indices = plan
1644 .schema()
1645 .columns()
1646 .into_iter()
1647 .enumerate()
1648 .filter_map(|(i, c)| case.partition_cols.contains(&c.name.as_str()).then_some(i))
1649 .collect();
1650 println!("indices: {partition_col_indices:?}");
1651 let analyzed = pushdown_projection_inexact(plan.clone(), &partition_col_indices)?;
1652 println!(
1653 "inexact projection pushdown:\n{}",
1654 analyzed.display_indent()
1655 );
1656
1657 context
1658 .register_table(
1659 case.table_name,
1660 Arc::new(DecoratorTable {
1662 inner: Arc::new(MockMaterializedView {
1663 table_path: ListingTableUrl::parse(case.table_path).unwrap(),
1664 partition_columns: case
1665 .partition_cols
1666 .iter()
1667 .map(|s| s.to_string())
1668 .collect(),
1669 static_partition_columns: case
1670 .static_partition_cols
1671 .as_ref()
1672 .map(|list| list.iter().map(|s| s.to_string()).collect()),
1673 query: plan,
1674 file_ext: case.file_extension,
1675 }),
1676 }),
1677 )
1678 .expect("couldn't register materialized view");
1679
1680 context
1681 .sql(&format!(
1682 "INSERT INTO file_metadata VALUES {}",
1683 case.file_metadata,
1684 ))
1685 .await?
1686 .collect()
1687 .await?;
1688
1689 context
1690 .sql(&format!(
1691 "SELECT * FROM file_metadata WHERE table_name = '{}'",
1692 case.table_name
1693 ))
1694 .await?
1695 .show()
1696 .await?;
1697
1698 let df = context
1699 .sql(&format!(
1700 "SELECT * FROM mv_dependencies('{}', 'v2')",
1701 case.table_name,
1702 ))
1703 .await
1704 .map_err(|e| e.context("get file dependencies"))?;
1705 df.clone().explain(false, false)?.show().await?;
1706 df.clone().show().await?;
1707
1708 assert_batches_sorted_eq!(case.expected_output, &df.collect().await?);
1709
1710 let df = context
1711 .sql(&format!(
1712 "SELECT * FROM stale_files('{}', 'v2')",
1713 case.table_name
1714 ))
1715 .await
1716 .map_err(|e| e.context("get stale files"))?;
1717 df.clone().explain(false, false)?.show().await?;
1718 df.clone().show().await?;
1719
1720 assert_batches_sorted_eq!(case.expected_stale_files_output, &df.collect().await?);
1721
1722 Ok(())
1723 }
1724
1725 for case in cases {
1726 run_test(case)
1727 .await
1728 .unwrap_or_else(|e| panic!("{} failed: {e}", case.name));
1729 }
1730 }
1731
1732 #[tokio::test]
1733 async fn test_projection_pushdown_inexact() -> Result<()> {
1734 struct TestCase {
1735 name: &'static str,
1736 query_to_analyze: &'static str,
1737 projection: &'static [&'static str],
1738 expected_plan: Vec<&'static str>,
1739 expected_output: Vec<&'static str>,
1740 }
1741
1742 let cases = &[
1743 TestCase {
1744 name: "simple projection",
1745 query_to_analyze:
1746 "SELECT column1 AS partition_column, concat(column2, column3) AS some_value FROM t1",
1747 projection: &["partition_column"],
1748 expected_plan: vec![
1749 "+--------------+--------------------------------------------+",
1750 "| plan_type | plan |",
1751 "+--------------+--------------------------------------------+",
1752 "| logical_plan | Projection: t1.column1 AS partition_column |",
1753 "| | TableScan: t1 projection=[column1] |",
1754 "+--------------+--------------------------------------------+",
1755 ],
1756 expected_output: vec![
1757 "+------------------+",
1758 "| partition_column |",
1759 "+------------------+",
1760 "| 2021 |",
1761 "| 2022 |",
1762 "| 2023 |",
1763 "+------------------+",
1764 ],
1765 },
1766 TestCase {
1767 name: "compound expressions",
1768 query_to_analyze: "
1769 SELECT DISTINCT
1770 to_timestamp_nanos(concat_ws('-', year, month, day)) AS timestamp,
1771 feed
1772 FROM t2",
1773 projection: &["timestamp", "feed"],
1774 expected_plan: vec![
1775 "+--------------+-------------------------------------------------------------------------------------------------------+",
1776 "| plan_type | plan |",
1777 "+--------------+-------------------------------------------------------------------------------------------------------+",
1778 "| logical_plan | Projection: to_timestamp_nanos(concat_ws(Utf8(\"-\"), t2.year, t2.month, t2.day)) AS timestamp, t2.feed |",
1779 "| | TableScan: t2 projection=[year, month, day, feed] |",
1780 "+--------------+-------------------------------------------------------------------------------------------------------+",
1781 ]
1782 ,
1783 expected_output: vec![
1784 "+---------------------+------+",
1785 "| timestamp | feed |",
1786 "+---------------------+------+",
1787 "| 2023-01-01T00:00:00 | A |",
1788 "| 2023-01-02T00:00:00 | B |",
1789 "| 2023-01-03T00:00:00 | C |",
1790 "| 2024-12-04T00:00:00 | X |",
1791 "| 2024-12-05T00:00:00 | Y |",
1792 "| 2024-12-06T00:00:00 | Z |",
1793 "+---------------------+------+",
1794 ],
1795 },
1796 TestCase {
1797 name: "empty projection",
1798 query_to_analyze: "SELECT column1 AS output FROM t3",
1799 projection: &[],
1800 expected_plan: vec![
1801 "+--------------+-----------------------------+",
1802 "| plan_type | plan |",
1803 "+--------------+-----------------------------+",
1804 "| logical_plan | TableScan: t3 projection=[] |",
1805 "+--------------+-----------------------------+",
1806 ],
1807 expected_output: vec![
1808 "++",
1809 "++",
1810 "++",
1811 ],
1812 },
1813 TestCase {
1814 name: "simple equijoin on year",
1815 query_to_analyze: "SELECT * FROM t2 INNER JOIN t3 USING (year)",
1816 projection: &["year"],
1817 expected_plan: vec![
1818 "+--------------+-------------------------------------+",
1819 "| plan_type | plan |",
1820 "+--------------+-------------------------------------+",
1821 "| logical_plan | Projection: t2.year |",
1822 "| | Inner Join: t2.year = t3.year |",
1823 "| | TableScan: t2 projection=[year] |",
1824 "| | TableScan: t3 projection=[year] |",
1825 "+--------------+-------------------------------------+",
1826 ],
1827 expected_output: vec![
1828 "+------+",
1829 "| year |",
1830 "+------+",
1831 "| 2023 |",
1832 "| 2023 |",
1833 "| 2023 |",
1834 "| 2024 |",
1835 "| 2024 |",
1836 "| 2024 |",
1837 "+------+",
1838 ],
1839 },
1840 TestCase {
1841 name: "triangular join on year",
1842 query_to_analyze: "
1843 SELECT
1844 t2.*,
1845 t3.* EXCLUDE(year),
1846 t3.year AS \"t3.year\"
1847 FROM t2
1848 INNER JOIN t3
1849 ON (t2.year <= t3.year)",
1850 projection: &["year"],
1851 expected_plan: vec![
1852 "+--------------+-------------------------------------------+",
1853 "| plan_type | plan |",
1854 "+--------------+-------------------------------------------+",
1855 "| logical_plan | Projection: t2.year |",
1856 "| | Inner Join: Filter: t2.year <= t3.year |",
1857 "| | TableScan: t2 projection=[year] |",
1858 "| | TableScan: t3 projection=[year] |",
1859 "+--------------+-------------------------------------------+",
1860 ],
1861 expected_output: vec![
1862 "+------+",
1863 "| year |",
1864 "+------+",
1865 "| 2023 |",
1866 "| 2023 |",
1867 "| 2023 |",
1868 "| 2023 |",
1869 "| 2023 |",
1870 "| 2023 |",
1871 "| 2024 |",
1872 "| 2024 |",
1873 "| 2024 |",
1874 "+------+",
1875 ],
1876 },
1877 TestCase {
1878 name: "window & unnest",
1879 query_to_analyze: "
1880 SELECT
1881 \"__unnest_placeholder(date).year\" AS year,
1882 \"__unnest_placeholder(date).month\" AS month,
1883 \"__unnest_placeholder(date).day\" AS day,
1884 arr
1885 FROM (
1886 SELECT
1887 unnest(date),
1888 unnest(arr) AS arr
1889 FROM (
1890 SELECT
1891 named_struct('year', year, 'month', month, 'day', day) AS date,
1892 array_agg(column2)
1893 OVER (ORDER BY year, month, day)
1894 AS arr
1895 FROM t2
1896 )
1897 )",
1898 projection: &["year", "month", "day"],
1899 expected_plan: vec![
1900 "+--------------+---------------------------------------------------------------------------------------------------------------------------------------+",
1901 "| plan_type | plan |",
1902 "+--------------+---------------------------------------------------------------------------------------------------------------------------------------+",
1903 "| logical_plan | Projection: __unnest_placeholder(date).year AS year, __unnest_placeholder(date).month AS month, __unnest_placeholder(date).day AS day |",
1904 "| | Unnest: lists[] structs[__unnest_placeholder(date)] |",
1905 "| | Projection: named_struct(Utf8(\"year\"), t2.year, Utf8(\"month\"), t2.month, Utf8(\"day\"), t2.day) AS __unnest_placeholder(date) |",
1906 "| | TableScan: t2 projection=[year, month, day] |",
1907 "+--------------+---------------------------------------------------------------------------------------------------------------------------------------+",
1908 ],
1909 expected_output: vec![
1910 "+------+-------+-----+",
1911 "| year | month | day |",
1912 "+------+-------+-----+",
1913 "| 2023 | 01 | 01 |",
1914 "| 2023 | 01 | 02 |",
1915 "| 2023 | 01 | 03 |",
1916 "| 2024 | 12 | 04 |",
1917 "| 2024 | 12 | 05 |",
1918 "| 2024 | 12 | 06 |",
1919 "+------+-------+-----+",
1920 ],
1921 },
1922 TestCase {
1923 name: "outer join + union",
1924 query_to_analyze: "
1925 SELECT
1926 COALESCE(t1.year, t2.year) AS year,
1927 t1.column2
1928 FROM (SELECT column1 AS year, column2 FROM t1) t1
1929 FULL OUTER JOIN (SELECT year, column2 FROM t2) t2
1930 USING (year)
1931 UNION ALL
1932 SELECT year, column1 AS column2 FROM t3
1933 ",
1934 projection: &["year"],
1935 expected_plan: vec![
1936 "+--------------+--------------------------------------------------------------------+",
1937 "| plan_type | plan |",
1938 "+--------------+--------------------------------------------------------------------+",
1939 "| logical_plan | Union |",
1940 "| | Projection: coalesce(CAST(t1.year AS Utf8View), t2.year) AS year |",
1941 "| | Full Join: Using CAST(t1.year AS Utf8View) = t2.year |",
1942 "| | SubqueryAlias: t1 |",
1943 "| | Projection: t1.column1 AS year |",
1944 "| | TableScan: t1 projection=[column1] |",
1945 "| | SubqueryAlias: t2 |",
1946 "| | TableScan: t2 projection=[year] |",
1947 "| | TableScan: t3 projection=[year] |",
1948 "+--------------+--------------------------------------------------------------------+",
1949 ],
1950 expected_output: vec![
1951 "+------+",
1952 "| year |",
1953 "+------+",
1954 "| 2021 |",
1955 "| 2022 |",
1956 "| 2023 |",
1957 "| 2023 |",
1958 "| 2023 |",
1959 "| 2023 |",
1960 "| 2024 |",
1961 "| 2024 |",
1962 "| 2024 |",
1963 "| 2024 |",
1964 "+------+",
1965 ],
1966 }
1967 ];
1968
1969 async fn run_test(case: &TestCase) -> Result<()> {
1970 let context = setup().await?;
1971
1972 let df = context.sql(case.query_to_analyze).await?;
1973 df.clone().explain(false, false)?.show().await?;
1974
1975 let plan = df.clone().into_optimized_plan()?;
1976
1977 let indices = case
1978 .projection
1979 .iter()
1980 .map(|&name| {
1981 plan.schema()
1982 .index_of_column(&Column::new_unqualified(name))
1983 })
1984 .collect::<Result<HashSet<_>>>()?;
1985
1986 let analyzed = DataFrame::new(
1987 context.state(),
1988 pushdown_projection_inexact(plan.clone(), &indices)?,
1989 );
1990 analyzed.clone().explain(false, false)?.show().await?;
1991
1992 if !case.projection.is_empty() {
1996 let select_original = df
1997 .clone()
1998 .select(
1999 case.projection
2000 .iter()
2001 .map(|&name| Expr::Column(Column::new_unqualified(name)))
2002 .collect_vec(),
2003 )
2004 .map_err(|e| e.context("select projection from original plan"))?
2005 .distinct()?;
2006
2007 let excess = analyzed
2008 .clone()
2009 .distinct()?
2010 .join(
2011 select_original.clone(),
2012 JoinType::RightAnti,
2013 case.projection,
2014 case.projection,
2015 None,
2016 )
2017 .map_err(|e| e.context("join in subset inclusion test"))?;
2018
2019 assert_eq!(
2020 excess
2021 .clone()
2022 .count()
2023 .await
2024 .map_err(|e| e.context("execute subset inclusion test"))?,
2025 0,
2026 "unexpected extra rows in transformed query:\n{}
2027 original:\n{}
2028 inexact pushdown:\n{}
2029 ",
2030 pretty_format_batches(&excess.collect().await?)?,
2031 pretty_format_batches(&select_original.collect().await?)?,
2032 pretty_format_batches(&analyzed.clone().distinct()?.collect().await?)?
2033 );
2034 }
2035
2036 assert_batches_eq!(
2037 case.expected_plan,
2038 &analyzed.clone().explain(false, false)?.collect().await?
2039 );
2040 assert_batches_sorted_eq!(case.expected_output, &analyzed.collect().await?);
2041
2042 Ok(())
2043 }
2044
2045 for case in cases {
2046 run_test(case)
2047 .await
2048 .unwrap_or_else(|e| panic!("{} failed: {e}", case.name));
2049 }
2050
2051 Ok(())
2052 }
2053
2054 #[test]
2055 fn test_pushdown_unnest_guard_partition_date_only() -> Result<()> {
2056 let id = Field::new("id", DataType::Utf8, true);
2078 let date = Field::new("date", DataType::Utf8, true);
2079
2080 let event_type = Field::new("event_type", DataType::Utf8, true);
2082 let event_time = Field::new("event_time", DataType::Utf8, true);
2083 let events_struct = Field::new(
2084 "item",
2085 DataType::Struct(Fields::from(vec![event_type, event_time])),
2086 true,
2087 );
2088 let events = Field::new(
2089 "events",
2090 DataType::List(FieldRef::from(Box::new(events_struct))),
2091 true,
2092 );
2093
2094 let qualified_fields = vec![
2096 (None, Arc::new(id.clone())),
2097 (None, Arc::new(date.clone())),
2098 (None, Arc::new(events.clone())),
2099 ];
2100 let df_schema =
2101 DFSchema::new_with_metadata(qualified_fields, std::collections::HashMap::new())?;
2102
2103 let empty = LogicalPlan::EmptyRelation(EmptyRelation {
2105 produce_one_row: false,
2106 schema: Arc::new(df_schema),
2107 });
2108
2109 let events_col = Column::from_name("events");
2111 let unnest_plan = unnest(empty.clone(), vec![events_col.clone()])?;
2112
2113 let date_idx = unnest_plan
2115 .schema()
2116 .index_of_column(&Column::from_name("date"))?;
2117 let mut indices: HashSet<usize> = HashSet::new();
2118 indices.insert(date_idx);
2119
2120 let res = pushdown_projection_inexact(unnest_plan, &indices)?;
2122
2123 let cols: Vec<String> = res
2125 .schema()
2126 .fields()
2127 .iter()
2128 .map(|f| f.name().to_string())
2129 .collect();
2130
2131 assert_eq!(cols, vec!["date"]);
2132
2133 Ok(())
2134 }
2135}