datafusion_materialized_views/materialized/
dependencies.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18/*!
19
20This module implements a dependency analysis algorithm for materialized views, heavily based on the [`ListingTableLike`](super::ListingTableLike) trait.
21Note that materialized views may depend on tables that are not `ListingTableLike`, as long as they have custom metadata explicitly installed
22into the [`RowMetadataRegistry`]. However, materialized views themself must implement `ListingTableLike`, as is
23implied by the type bound `Materialized: ListingTableLike`.
24
25The dependency analysis in a nutshell involves analyzing the fragment of the materialized view's logical plan corresponding to
26partition columns (or row metadata columns more generally). This logical fragment is then used to generate a dependency graph between physical partitions
27of the materialized view and its source tables. This gives rise to two natural phases of the algorithm:
281. **Inexact Projection Pushdown**: We aggressively prune the logical plan to only include partition columns (or row metadata columns more generally) of the materialized view and its sources.
29   This is similar to pushing down a top-level projection on the materialized view's partition columns. However, "inexact" means that we do not preserve duplicates, order,
30   or even set equality of the original query.
31   * Formally, let P be the (exact) projection operator. If A is the original plan and A' is the result of "inexact" projection pushdown, we have PA ⊆ A'.
32   * This means that in the final output, we may have dependencies that do not exist in the original query. However, we will never miss any dependencies.
332. **Dependency Graph Construction**: Once we have the pruned logical plan, we can construct a dependency graph between the physical partitions of the materialized view and its sources.
34   After step 1, every table scan only contains row metadata columns, so we replace the table scan with an equivalent scan to a [`RowMetadataSource`](super::row_metadata::RowMetadataSource)
35   This operation also is not duplicate or order preserving. Then, additional metadata is "pushed up" through the plan to the root, where it can be unnested to give a list of source files for each output row.
36   The output rows are then transformed into object storage paths to generate the final graph.
37
38The transformation is complex, and we give a full walkthrough in the documentation for [`mv_dependencies_plan`].
39 */
40
41use datafusion::{
42    catalog::{CatalogProviderList, TableFunctionImpl},
43    config::{CatalogOptions, ConfigOptions},
44    datasource::{provider_as_source, TableProvider, ViewTable},
45    prelude::{flatten, get_field, make_array},
46};
47use datafusion_common::{
48    alias::AliasGenerator,
49    internal_err,
50    tree_node::{Transformed, TreeNode},
51    DFSchema, DataFusionError, Result, ScalarValue,
52};
53use datafusion_expr::{
54    col, lit, utils::split_conjunction, Expr, LogicalPlan, LogicalPlanBuilder, TableScan,
55};
56use datafusion_functions::string::expr_fn::{concat, concat_ws};
57use datafusion_sql::TableReference;
58use itertools::{Either, Itertools};
59use std::{collections::HashSet, sync::Arc};
60
61use crate::materialized::META_COLUMN;
62
63use super::{cast_to_materialized, row_metadata::RowMetadataRegistry, util, Materialized};
64
65/// A table function that, for a given materialized view, lists all the output data objects (build targets)
66/// generated during its construction or refresh, as well as all the source data objects (dependencies) it relies on.
67///
68/// ```ignore
69/// fn mv_dependencies(table_ref: Utf8) -> Table
70/// ```
71///
72/// `table_ref` should point to a table provider registered for the current session
73/// that implements [`Materialized`]. Otherwise the function will throw an error.
74///
75/// # Example
76///
77/// ```sql
78/// SELECT * FROM mv_dependencies('datafusion.public.m1');
79///
80/// +--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+
81/// | target                         | source_table_catalog | source_table_schema | source_table_name | source_uri                           | source_last_modified |
82/// +--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+
83/// | s3://m1/partition_column=2021/ | datafusion           | public              | t1                | s3://t1/column1=2021/data.01.parquet | 2023-07-11T16:29:26  |
84/// | s3://m1/partition_column=2022/ | datafusion           | public              | t1                | s3://t1/column1=2022/data.01.parquet | 2023-07-11T16:45:22  |
85/// | s3://m1/partition_column=2023/ | datafusion           | public              | t1                | s3://t1/column1=2023/data.01.parquet | 2023-07-11T16:45:44  |
86/// +--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+
87/// ```
88pub fn mv_dependencies(
89    catalog_list: Arc<dyn CatalogProviderList>,
90    row_metadata_registry: Arc<RowMetadataRegistry>,
91    options: &ConfigOptions,
92) -> Arc<dyn TableFunctionImpl + 'static> {
93    Arc::new(FileDependenciesUdtf::new(
94        catalog_list,
95        row_metadata_registry,
96        options,
97    ))
98}
99
100#[derive(Debug)]
101struct FileDependenciesUdtf {
102    catalog_list: Arc<dyn CatalogProviderList>,
103    row_metadata_registry: Arc<RowMetadataRegistry>,
104    config_options: ConfigOptions,
105}
106
107impl FileDependenciesUdtf {
108    fn new(
109        catalog_list: Arc<dyn CatalogProviderList>,
110        row_metadata_registry: Arc<RowMetadataRegistry>,
111        config_options: &ConfigOptions,
112    ) -> Self {
113        Self {
114            catalog_list,
115            config_options: config_options.clone(),
116            row_metadata_registry,
117        }
118    }
119}
120
121impl TableFunctionImpl for FileDependenciesUdtf {
122    fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
123        let table_name = get_table_name(args)?;
124
125        let table_ref = TableReference::from(table_name).resolve(
126            &self.config_options.catalog.default_catalog,
127            &self.config_options.catalog.default_schema,
128        );
129
130        let table = util::get_table(self.catalog_list.as_ref(), &table_ref)
131            .map_err(|e| DataFusionError::Plan(e.to_string()))?;
132
133        let mv = cast_to_materialized(table.as_ref())?.ok_or(DataFusionError::Plan(format!(
134            "mv_dependencies: table '{table_name} is not a materialized view. (Materialized TableProviders must be registered using register_materialized"),
135        ))?;
136
137        Ok(Arc::new(ViewTable::new(
138            mv_dependencies_plan(
139                mv,
140                self.row_metadata_registry.as_ref(),
141                &self.config_options,
142            )?,
143            None,
144        )))
145    }
146}
147
148/// A table function that shows which files need to be regenerated.
149/// Checks `last_modified` timestamps from the file metadata table
150/// and deems a target stale if any of its sources are newer than it.
151///
152/// # `file_metadata`
153///
154/// Accepts a [`TableProvider`] whose schema matches that of [`FileMetadata`](super::file_metadata::FileMetadata).
155/// Normally, a `FileMetadata` may be passed in as normal, but custom file metadata sources or mock data can be passed in
156/// with a user-provided `TableProvider`.
157pub fn stale_files(
158    catalog_list: Arc<dyn CatalogProviderList>,
159    row_metadata_registry: Arc<RowMetadataRegistry>,
160    file_metadata: Arc<dyn TableProvider>,
161    config_options: &ConfigOptions,
162) -> Arc<dyn TableFunctionImpl + 'static> {
163    Arc::new(StaleFilesUdtf {
164        mv_dependencies: FileDependenciesUdtf {
165            catalog_list,
166            row_metadata_registry,
167            config_options: config_options.clone(),
168        },
169        file_metadata,
170    })
171}
172
173#[derive(Debug)]
174struct StaleFilesUdtf {
175    mv_dependencies: FileDependenciesUdtf,
176    file_metadata: Arc<dyn TableProvider>,
177}
178
179impl TableFunctionImpl for StaleFilesUdtf {
180    fn call(&self, args: &[Expr]) -> Result<Arc<dyn TableProvider>> {
181        use datafusion::prelude::*;
182        use datafusion_functions_aggregate::min_max::max;
183
184        let dependencies = provider_as_source(self.mv_dependencies.call(args)?);
185
186        let table_name = get_table_name(args)?;
187
188        let table_ref = TableReference::from(table_name).resolve(
189            &self.mv_dependencies.config_options.catalog.default_catalog,
190            &self.mv_dependencies.config_options.catalog.default_schema,
191        );
192
193        let table = util::get_table(self.mv_dependencies.catalog_list.as_ref(), &table_ref)
194            .map_err(|e| DataFusionError::Plan(e.to_string()))?;
195        let mv = cast_to_materialized(table.as_ref())?.ok_or(DataFusionError::Plan(format!(
196            "mv_dependencies: table '{table_name} is not a materialized view. (Materialized TableProviders must be registered using register_materialized"),
197        ))?;
198
199        let url = mv.table_paths()[0].to_string();
200        let num_static_partition_cols = mv.static_partition_columns().len();
201
202        let logical_plan =
203            LogicalPlanBuilder::scan_with_filters("dependencies", dependencies, None, vec![])?
204                .aggregate(
205                    vec![col("dependencies.target").alias("expected_target")],
206                    vec![max(col("source_last_modified")).alias("sources_last_modified")],
207                )?
208                .join(
209                    LogicalPlanBuilder::scan_with_filters(
210                        "file_metadata",
211                        provider_as_source(
212                            Arc::clone(&self.file_metadata) as Arc<dyn TableProvider>
213                        ),
214                        None,
215                        vec![
216                            col("table_catalog").eq(lit(table_ref.catalog.as_ref())),
217                            col("table_schema").eq(lit(table_ref.schema.as_ref())),
218                            col("table_name").eq(lit(table_ref.table.as_ref())),
219                        ],
220                    )?
221                    .aggregate(
222                        vec![
223                            // Omit the file name along with any "special" partitions.
224                            // This can include dynamic partition columns as well as some internal
225                            // metadata columns that are not part of the schema
226                            //
227                            // We implement this by only taking the first N columns,
228                            // where N is the number of static partition columns.
229                            array_element(
230                                regexp_match(
231                                    col("file_path"),
232                                    lit(format!(
233                                        "{url}(?:[^/=]+=[^/]+/){{{num_static_partition_cols}}}"
234                                    )),
235                                    None,
236                                ),
237                                lit(1),
238                            )
239                            .alias("existing_target"),
240                        ],
241                        vec![max(col("last_modified")).alias("target_last_modified")],
242                    )?
243                    .project(vec![col("existing_target"), col("target_last_modified")])?
244                    .build()?,
245                    JoinType::Left,
246                    (vec!["expected_target"], vec!["existing_target"]),
247                    None,
248                )?
249                .project(vec![
250                    col("expected_target").alias("target"),
251                    col("target_last_modified"),
252                    col("sources_last_modified"),
253                    nvl(
254                        col("target_last_modified"),
255                        lit(ScalarValue::TimestampNanosecond(
256                            Some(0),
257                            Some(Arc::from("UTC")),
258                        )),
259                    )
260                    .lt(col("sources_last_modified"))
261                    .alias("is_stale"),
262                ])?
263                .build()?;
264
265        Ok(Arc::new(ViewTable::new(logical_plan, None)))
266    }
267}
268
269/// Extract table name from args passed to TableFunctionImpl::call()
270fn get_table_name(args: &[Expr]) -> Result<&String> {
271    match &args[0] {
272        Expr::Literal(ScalarValue::Utf8(Some(table_name)), _) => Ok(table_name),
273        _ => Err(DataFusionError::Plan(
274            "expected a single string literal argument to mv_dependencies".to_string(),
275        )),
276    }
277}
278
279#[cfg_attr(doc, aquamarine::aquamarine)]
280/// Returns a logical plan that, when executed, lists expected build targets
281/// for this materialized view, together with the dependencies for each target.
282///
283/// See the [module documentation](super) for an overview of the algorithm.
284///
285/// # Example
286///
287/// We explain in detail how the dependency analysis works in an example. Consider the following SQL query, which computes daily
288/// close prices of a stock from its trades, together with the settlement price from a daily statistics table:
289///
290/// ```sql
291/// SELECT
292///     ticker,
293///     LAST_VALUE(trades.price) AS close,
294///     LAST_VALUE(daily_statistics.settlement_price) AS settlement_price,
295///     trades.date AS date
296/// FROM trades
297/// JOIN daily_statistics ON
298///     trades.ticker = daily_statistics.ticker AND
299///     trades.date = daily_statistics.reference_date AND
300///     daily_statistics.date BETWEEN trades.date AND trades.date + INTERVAL 2 WEEKS
301/// GROUP BY ticker, date
302/// ```
303///
304/// Assume that both tables are partitioned by `date` only. We desired a materialized view partitioned by `date` and stored at `s3://daily_close/`.
305/// This query gives us the following logical plan:
306///
307/// ```mermaid
308/// %%{init: { 'flowchart': { 'wrappingWidth': 1000 }}}%%
309/// graph TD
310///     A["Projection: <br>ticker, LAST_VALUE(trades.price) AS close, LAST_VALUE(daily_statistics.settlement_price) AS settlement_price, <mark>trades.date AS date</mark>"]
311///     A --> B["Aggregate: <br>expr=[LAST_VALUE(trades.price), LAST_VALUE(daily_statistics.settlement_price)] <br>groupby=[ticker, <mark>trades.date</mark>]"]
312///     B --> C["Inner Join: <br>trades.ticker = daily_statistics.ticker AND <br>trades.date = daily_statistics.reference_date AND <br><mark>daily_statistics.date BETWEEN trades.date AND trades.date + INTERVAL 2 WEEKS</mark>"]
313///     C --> D["TableScan: trades <br>projection=[ticker, price, <mark>date</mark>]"]
314///     C --> E["TableScan: daily_statistics <br>projection=[ticker, settlement_price, reference_date, <mark>date</mark>]"]
315/// ```
316///
317/// All partition-column-derived expressions are marked in yellow. We now proceed with **Inexact Projection Pushdown**, and prune all unmarked expressions, resulting in the following plan:
318///
319/// ```mermaid
320/// %%{init: { 'flowchart': { 'wrappingWidth': 1000 }}}%%
321/// graph TD
322///     A["Projection: trades.date AS date"]
323///     A --> B["Projection: trades.date"]
324///     B --> C["Inner Join: <br>daily_statistics.date BETWEEN trades.date AND trades.date + INTERVAL 2 WEEKS"]
325///     C --> D["TableScan: trades (projection=[date])"]
326///     C --> E["TableScan: daily_statistics (projection=[date])"]
327/// ```
328///
329/// Note that the `Aggregate` node was converted into a projection. This is valid because we do not need to preserve duplicate rows. However, it does imply that
330/// we cannot partition the materialized view on aggregate expressions.
331///
332/// Now we substitute all scans with equivalent row metadata scans (up to addition or removal of duplicates), and push up the row metadata to the root of the plan,
333/// together with the target path constructed from the (static) partition columns. This gives us the following plan:
334///
335/// ```mermaid
336/// %%{init: { 'flowchart': { 'wrappingWidth': 1000 }}}%%
337/// graph TD
338///     A["Projection: concat('s3://daily_close/date=', date::string, '/') AS target, __meta"]
339///     A --> B["Projection: __meta, trades.date AS date"]
340///     B --> C["Projection: <br>concat(trades_meta.__meta, daily_statistics_meta.__meta) AS __meta, date"]
341///     C --> D["Inner Join: <br><b>daily_statistics_meta</b>.date BETWEEN <b>trades_meta</b>.date AND <b>trades_meta</b>.date + INTERVAL 2 WEEKS"]
342///     D --> E["TableScan: <b>trades_meta</b> (projection=[__meta, date])"]
343///     D --> F["TableScan: <b>daily_statistics_meta</b> (projection=[__meta, date])"]
344/// ```
345///
346/// Here, `__meta` is a column containing a list of structs with the row metadata for each source file. The final query has this struct column
347/// unnested into its components. The final output looks roughly like this:
348///
349/// ```text
350/// +-----------------------------------+----------------------+---------------------+-------------------+-------------------------------------------------------+----------------------+
351/// | target                            | source_table_catalog | source_table_schema | source_table_name | source_uri                                            | source_last_modified |
352/// +-----------------------------------+----------------------+---------------------+-------------------+-------------------------------------------------------+----------------------+
353/// | s3://daily_close/date=2023-01-01/ | datafusion           | public              | trades            | s3://trades/date=2023-01-01/data.01.parquet           | 2023-07-11T16:29:26  |
354/// | s3://daily_close/date=2023-01-01/ | datafusion           | public              | daily_statistics  | s3://daily_statistics/date=2023-01-07/data.01.parquet | 2023-07-11T16:45:22  |
355/// | s3://daily_close/date=2023-01-02/ | datafusion           | public              | trades            | s3://trades/date=2023-01-02/data.01.parquet           | 2023-07-11T16:45:44  |
356/// | s3://daily_close/date=2023-01-02/ | datafusion           | public              | daily_statistics  | s3://daily_statistics/date=2023-01-07/data.01.parquet | 2023-07-11T16:46:10  |
357/// +-----------------------------------+----------------------+---------------------+-------------------+-------------------------------------------------------+----------------------+
358/// ```
359pub fn mv_dependencies_plan(
360    materialized_view: &dyn Materialized,
361    row_metadata_registry: &RowMetadataRegistry,
362    config_options: &ConfigOptions,
363) -> Result<LogicalPlan> {
364    use datafusion_expr::logical_plan::*;
365
366    let plan = materialized_view.query().clone();
367
368    let static_partition_cols = materialized_view.static_partition_columns();
369    let static_partition_col_indices = plan
370        .schema()
371        .fields()
372        .iter()
373        .enumerate()
374        .filter_map(|(i, f)| static_partition_cols.contains(f.name()).then_some(i))
375        .collect();
376
377    let pruned_plan_with_source_files = if static_partition_cols.is_empty() {
378        get_source_files_all_partitions(
379            materialized_view,
380            &config_options.catalog,
381            row_metadata_registry,
382        )
383    } else {
384        // Prune non-partition columns from all table scans
385        let pruned_plan = pushdown_projection_inexact(plan, &static_partition_col_indices)?;
386
387        // Now bubble up file metadata to the top of the plan
388        push_up_file_metadata(pruned_plan, &config_options.catalog, row_metadata_registry)
389    }?;
390
391    // We now have data in the following form:
392    // (static_partition_col0, static_partition_col1, ..., __meta)
393    // The last column is a list of structs containing the row metadata
394    // We need to unnest it
395
396    // Find the single column with the name '__meta'
397    let files = pruned_plan_with_source_files
398        .schema()
399        .columns()
400        .into_iter()
401        .find(|c| c.name.starts_with(META_COLUMN))
402        .ok_or_else(|| DataFusionError::Plan(format!("Plan contains no {META_COLUMN} column")))?;
403    let files_col = Expr::Column(files.clone());
404
405    LogicalPlanBuilder::from(pruned_plan_with_source_files)
406        .unnest_column(files)?
407        .project(vec![
408            construct_target_path_from_static_partition_columns(materialized_view).alias("target"),
409            get_field(files_col.clone(), "table_catalog").alias("source_table_catalog"),
410            get_field(files_col.clone(), "table_schema").alias("source_table_schema"),
411            get_field(files_col.clone(), "table_name").alias("source_table_name"),
412            get_field(files_col.clone(), "source_uri").alias("source_uri"),
413            get_field(files_col.clone(), "last_modified").alias("source_last_modified"),
414        ])?
415        .distinct()?
416        .build()
417}
418
419fn construct_target_path_from_static_partition_columns(
420    materialized_view: &dyn Materialized,
421) -> Expr {
422    let table_path = lit(materialized_view.table_paths()[0]
423        .as_str()
424        // Trim the / (we'll add it back later if we need it)
425        .trim_end_matches("/"));
426    // Construct the paths for the build targets
427    let mut hive_column_path_elements = materialized_view
428        .static_partition_columns()
429        .iter()
430        .map(|column_name| concat([lit(column_name.as_str()), lit("="), col(column_name)].to_vec()))
431        .collect::<Vec<_>>();
432    hive_column_path_elements.insert(0, table_path);
433
434    concat(vec![
435        // concat_ws doesn't work if there are < 2 elements to concat
436        if hive_column_path_elements.len() == 1 {
437            hive_column_path_elements.pop().unwrap()
438        } else {
439            concat_ws(lit("/"), hive_column_path_elements)
440        },
441        // Always need a trailing slash on directory paths
442        lit("/"),
443    ])
444}
445
446/// An implementation of "inexact" projection pushdown that eliminates aggregations, windows, sorts, & limits.
447/// Does not preserve order or row multiplicity and may return rows outside of the original projection.
448/// However, it has the following property:
449/// Let P be a projection operator.
450/// If A is the original plan and A' is the result of "inexact" projection pushdown, we have PA ⊆ A'.
451///
452/// The purpose is to be as aggressive as possible with projection pushdown at the sacrifice of exactness.
453fn pushdown_projection_inexact(plan: LogicalPlan, indices: &HashSet<usize>) -> Result<LogicalPlan> {
454    use datafusion_expr::logical_plan::*;
455
456    let plan_formatted = format!("{}", plan.display());
457    match plan {
458        LogicalPlan::Projection(Projection { expr, input, .. }) => {
459            let new_exprs = expr
460                .into_iter()
461                .enumerate()
462                .filter_map(|(i, expr)| indices.contains(&i).then_some(expr))
463                .collect_vec();
464
465            let child_indices = new_exprs
466                .iter()
467                .flat_map(|e| e.column_refs().into_iter())
468                .map(|c| input.schema().index_of_column(c).unwrap())
469                .collect::<HashSet<_>>();
470
471            Projection::try_new(
472                new_exprs,
473                pushdown_projection_inexact(Arc::unwrap_or_clone(input), &child_indices)
474                    .map(Arc::new)?,
475            )
476            .map(LogicalPlan::Projection)
477        }
478        LogicalPlan::Filter(ref filter) => {
479            let mut indices = indices.clone();
480
481            let new_filter = widen_filter(&filter.predicate, &mut indices, &plan)?;
482
483            let filter = match plan {
484                LogicalPlan::Filter(filter) => filter,
485                _ => unreachable!(),
486            };
487
488            Filter::try_new(
489                new_filter,
490                pushdown_projection_inexact(Arc::unwrap_or_clone(filter.input), &indices)
491                    .map(Arc::new)?,
492            )
493            .map(LogicalPlan::Filter)
494        }
495        LogicalPlan::Window(Window {
496            input,
497            window_expr: _,
498            ..
499        }) => {
500            // Window nodes take their input and append window expressions to the end.
501            // If our projection doesn't include window expressions, we can just turn
502            // the window into a regular projection.
503            let num_non_window_cols = input.schema().fields().len();
504            if indices.iter().any(|&i| i >= num_non_window_cols) {
505                return internal_err!("Can't push down projection through window functions");
506            }
507
508            pushdown_projection_inexact(Arc::unwrap_or_clone(input), indices)
509        }
510        LogicalPlan::Aggregate(Aggregate {
511            input, group_expr, ..
512        }) => {
513            // Aggregate node schemas are the GROUP BY expressions followed by the aggregate expressions.
514            let num_group_exprs = group_expr.len();
515            if indices.iter().any(|&i| i >= num_group_exprs) {
516                return internal_err!("Can't push down projection through aggregate functions");
517            }
518
519            let new_exprs = group_expr
520                .into_iter()
521                .enumerate()
522                .filter_map(|(i, expr)| indices.contains(&i).then_some(expr))
523                .collect_vec();
524
525            let child_indices = new_exprs
526                .iter()
527                .flat_map(|e| e.column_refs().into_iter())
528                .map(|c| input.schema().index_of_column(c).unwrap())
529                .collect::<HashSet<_>>();
530
531            Projection::try_new(
532                new_exprs,
533                pushdown_projection_inexact(Arc::unwrap_or_clone(input), &child_indices)
534                    .map(Arc::new)?,
535            )
536            .map(LogicalPlan::Projection)
537        }
538        LogicalPlan::Join(ref join) => {
539            let join_type = join.join_type;
540            match join_type {
541                JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => {}
542                _ => {
543                    return Err(DataFusionError::Internal(format!(
544                        "unsupported join type: {join_type}"
545                    )))
546                }
547            };
548
549            let mut indices = indices.clone();
550
551            // Relax the filter so that it can be computed from the
552            // "pruned" children
553            let filter = join
554                .filter
555                .as_ref()
556                .map(|f| widen_filter(f, &mut indices, &plan))
557                .transpose()?;
558
559            let (mut left_child_indices, mut right_child_indices) =
560                indices.iter().partition_map(|&i| {
561                    if i < join.left.schema().fields().len() {
562                        Either::Left(i)
563                    } else {
564                        Either::Right(i - join.left.schema().fields().len())
565                    }
566                });
567
568            let on = join.on.iter().try_fold(vec![], |mut v, (lexpr, rexpr)| {
569                // The ON clause includes filters like `lexpr = rexpr`
570                // If either side is considered 'relevant', we include it.
571                // See documentation for [`expr_is_relevant`].
572                if expr_is_relevant(lexpr, &left_child_indices, &join.left)?
573                    || expr_is_relevant(rexpr, &right_child_indices, &join.right)?
574                {
575                    add_all_columns_to_indices(lexpr, &mut left_child_indices, &join.left)?;
576                    add_all_columns_to_indices(rexpr, &mut right_child_indices, &join.right)?;
577                    v.push((lexpr.clone(), rexpr.clone()))
578                }
579
580                Ok::<_, DataFusionError>(v)
581            })?;
582
583            let join = match plan {
584                LogicalPlan::Join(join) => join,
585                _ => unreachable!(),
586            };
587
588            let left =
589                pushdown_projection_inexact(Arc::unwrap_or_clone(join.left), &left_child_indices)
590                    .map(Arc::new)?;
591            let right =
592                pushdown_projection_inexact(Arc::unwrap_or_clone(join.right), &right_child_indices)
593                    .map(Arc::new)?;
594
595            let schema = project_dfschema(join.schema.as_ref(), &indices).map(Arc::new)?;
596
597            Ok(LogicalPlan::Join(Join {
598                left,
599                right,
600                on,
601                filter,
602                join_type,
603                schema,
604                ..join
605            }))
606        }
607        LogicalPlan::Union(Union { inputs, schema, .. }) => {
608            let inputs = inputs
609                .into_iter()
610                .map(Arc::unwrap_or_clone)
611                .map(|plan| pushdown_projection_inexact(plan, indices))
612                .map_ok(Arc::new)
613                .collect::<Result<Vec<_>>>()?;
614
615            Ok(LogicalPlan::Union(Union {
616                inputs,
617                schema: project_dfschema(schema.as_ref(), indices).map(Arc::new)?,
618            }))
619        }
620        LogicalPlan::TableScan(ref scan) => {
621            let mut indices = indices.clone();
622            let filters = scan
623                .filters
624                .iter()
625                .map(|f| widen_filter(f, &mut indices, &plan))
626                .collect::<Result<Vec<_>>>()?;
627
628            let new_projection = scan
629                .projection
630                .clone()
631                .unwrap_or((0..scan.source.schema().fields().len()).collect())
632                .into_iter()
633                .enumerate()
634                .filter_map(|(i, j)| indices.contains(&i).then_some(j))
635                .collect_vec();
636
637            let scan = match plan {
638                LogicalPlan::TableScan(scan) => scan,
639                _ => unreachable!(),
640            };
641
642            TableScan::try_new(
643                scan.table_name,
644                scan.source,
645                Some(new_projection),
646                filters,
647                None,
648            )
649            .map(LogicalPlan::TableScan)
650        }
651        LogicalPlan::EmptyRelation(EmptyRelation {
652            produce_one_row,
653            schema,
654        }) => Ok(LogicalPlan::EmptyRelation(EmptyRelation {
655            produce_one_row,
656            schema: project_dfschema(schema.as_ref(), indices).map(Arc::new)?,
657        })),
658        LogicalPlan::SubqueryAlias(SubqueryAlias { input, alias, .. }) => SubqueryAlias::try_new(
659            pushdown_projection_inexact(Arc::unwrap_or_clone(input), indices).map(Arc::new)?,
660            alias,
661        )
662        .map(LogicalPlan::SubqueryAlias),
663        LogicalPlan::Limit(Limit { input, .. }) | LogicalPlan::Sort(Sort { input, .. }) => {
664            // Ignore sorts/limits entirely and remove them from the plan
665            pushdown_projection_inexact(Arc::unwrap_or_clone(input), indices)
666        }
667        LogicalPlan::Values(Values { schema, values }) => {
668            let schema = project_dfschema(&schema, indices).map(Arc::new)?;
669            let values = values
670                .into_iter()
671                .map(|row| {
672                    row.into_iter()
673                        .enumerate()
674                        .filter_map(|(i, v)| indices.contains(&i).then_some(v))
675                        .collect_vec()
676                })
677                .collect_vec();
678
679            Ok(LogicalPlan::Values(Values { schema, values }))
680        }
681        LogicalPlan::Distinct(Distinct::All(input)) => {
682            pushdown_projection_inexact(Arc::unwrap_or_clone(input), indices)
683                .map(Arc::new)
684                .map(Distinct::All)
685                .map(LogicalPlan::Distinct)
686        }
687        LogicalPlan::Unnest(unnest) => {
688            // Map parent indices to child indices.
689            // The columns of an unnest node have a many-to-one relation
690            // to the columns of the input.
691            let child_indices = indices
692                .iter()
693                .map(|&i| unnest.dependency_indices[i])
694                .collect::<HashSet<_>>();
695
696            let input_using_columns = unnest.input.using_columns()?;
697            let input_schema = unnest.input.schema();
698            let columns_to_unnest =
699                unnest
700                    .exec_columns
701                    .into_iter()
702                    .try_fold(vec![], |mut v, c| {
703                        let c = c.normalize_with_schemas_and_ambiguity_check(
704                            &[&[input_schema.as_ref()]],
705                            &input_using_columns,
706                        )?;
707                        let idx = input_schema.index_of_column(&c)?;
708                        if child_indices.contains(&idx) {
709                            v.push(c);
710                        }
711
712                        Ok::<_, DataFusionError>(v)
713                    })?;
714
715            let columns_to_project = unnest
716                .schema
717                .columns()
718                .into_iter()
719                .enumerate()
720                .filter_map(|(i, c)| indices.contains(&i).then_some(c))
721                .map(Expr::Column)
722                .collect_vec();
723
724            // GUARD: if after pushdown the set of relevant unnest columns is empty,
725            // avoid constructing an Unnest node with zero exec columns (which will
726            // later error in Unnest::try_new). Instead, simply project the
727            // desired output columns from the child plan (after pushing down the child projection).
728            // Related PR: https://github.com/apache/datafusion/pull/16632, after that we must
729            // also check for empty exec columns here.
730            if columns_to_unnest.is_empty() {
731                return LogicalPlanBuilder::from(pushdown_projection_inexact(
732                    Arc::unwrap_or_clone(unnest.input),
733                    &child_indices,
734                )?)
735                .project(columns_to_project)?
736                .build();
737            }
738
739            LogicalPlanBuilder::from(pushdown_projection_inexact(
740                Arc::unwrap_or_clone(unnest.input),
741                &child_indices,
742            )?)
743            .unnest_columns_with_options(columns_to_unnest, unnest.options)?
744            .project(columns_to_project)?
745            .build()
746        }
747
748        _ => internal_err!("Unsupported logical plan node: {}", plan.display()),
749    }
750    .map_err(|e| e.context(format!("plan: \n{plan_formatted}")))
751}
752
753/// 'Widen' a filter, i.e. given a predicate P,
754/// compute P' such that P' is true whenever P is.
755/// In particular, P' should be computed using columns whose indices are in `indices`.
756///
757/// # Mutating `indices`
758///
759/// Currently under some conditions this function will add new entries to `indices`.
760/// This is particularly important in some cases involving joins. For example,
761/// consider the following plan:
762///
763/// ```ignore
764/// Projection: t2.year, t2.month, t2.day, t2.feed, t2.column2, t3.column1
765///  Inner Join: Using t2.year = t3.year
766///  TableScan: t2
767///  TableScan: t3
768/// ```
769///
770/// If we want to prune all parts of the plan not related to t2.year, we'd get something like this:
771///
772/// ```ignore
773/// Projection: t2.year
774///  Inner Join: Using
775///  TableScan: t2 projection=[year]
776///  TableScan: t3 projection=[]
777/// ```
778///
779/// Notice that the filter in the inner join is gone. This is because `t3.year` is not obviously referenced in the definition of `t2.year`;
780/// it is only implicitly used in the join filter.
781///
782/// To get around this, we look at filter expressions, and if they contain a _single_ column in the index set,
783/// we add the rest of the columns from the filter to the index set, to ensure all of the filter's inputs
784/// will be present.
785fn widen_filter(
786    predicate: &Expr,
787    indices: &mut HashSet<usize>,
788    parent: &LogicalPlan,
789) -> Result<Expr> {
790    let conjunctions = split_conjunction(predicate);
791
792    conjunctions.into_iter().try_fold(lit(true), |a, b| {
793        Ok(if expr_is_relevant(b, indices, parent)? {
794            add_all_columns_to_indices(b, indices, parent)?;
795            a.and(b.clone())
796        } else {
797            a
798        })
799    })
800}
801
802/// An expression is considered 'relevant' if a single column is inside our index set.
803fn expr_is_relevant(expr: &Expr, indices: &HashSet<usize>, parent: &LogicalPlan) -> Result<bool> {
804    let schemas = parent
805        .inputs()
806        .iter()
807        .map(|input| input.schema().as_ref())
808        .collect_vec();
809    let using_columns = parent.using_columns()?;
810
811    for c in expr.column_refs() {
812        let normalized_column = c
813            .clone()
814            .normalize_with_schemas_and_ambiguity_check(&[&schemas], &using_columns)?;
815        let column_idx = parent.schema().index_of_column(&normalized_column)?;
816
817        if indices.contains(&column_idx) {
818            return Ok(true);
819        }
820    }
821
822    Ok(false)
823}
824
825/// Get all referenced columns in the expression,
826/// and add them to the index set.
827fn add_all_columns_to_indices(
828    expr: &Expr,
829    indices: &mut HashSet<usize>,
830    parent: &LogicalPlan,
831) -> Result<()> {
832    let schemas = parent
833        .inputs()
834        .iter()
835        .map(|input| input.schema().as_ref())
836        .collect_vec();
837    let using_columns = parent.using_columns()?;
838
839    for c in expr.column_refs() {
840        let normalized_column = c
841            .clone()
842            .normalize_with_schemas_and_ambiguity_check(&[&schemas], &using_columns)?;
843        let column_idx = parent.schema().index_of_column(&normalized_column)?;
844
845        indices.insert(column_idx);
846    }
847
848    Ok(())
849}
850
851fn project_dfschema(schema: &DFSchema, indices: &HashSet<usize>) -> Result<DFSchema> {
852    let qualified_fields = (0..schema.fields().len())
853        .filter_map(|i| {
854            indices.contains(&i).then_some({
855                let (reference, field) = schema.qualified_field(i);
856                (reference.cloned(), Arc::new(field.clone()))
857            })
858        })
859        .collect_vec();
860
861    // todo: handle functional dependencies
862    DFSchema::new_with_metadata(qualified_fields, schema.metadata().clone())
863}
864
865/// Rewrite TableScans on top of the file metadata table,
866/// assuming the query only uses the S3 partition columns.
867/// Then push up the file metadata to the output of this plan.
868///
869/// The result will have a single new column with an autogenerated name "__meta_<id>"
870/// which contains the source file metadata for a given row in the output.
871fn push_up_file_metadata(
872    plan: LogicalPlan,
873    catalog_options: &CatalogOptions,
874    row_metadata_registry: &RowMetadataRegistry,
875) -> Result<LogicalPlan> {
876    let alias_generator = AliasGenerator::new();
877    plan.transform_up(|plan| {
878        match plan {
879            LogicalPlan::TableScan(scan) => {
880                scan_columns_from_row_metadata(scan, catalog_options, row_metadata_registry)
881            }
882            plan => project_row_metadata_from_input(plan, &alias_generator),
883        }
884        .and_then(LogicalPlan::recompute_schema)
885        .map(Transformed::yes)
886    })
887    .map(|t| t.data)
888}
889
890/// Assuming the input has any columns of the form "__meta_<id>",
891/// push up the file columns through the output of this LogicalPlan node.
892/// The output will have a single new column of the form "__meta_<id>".
893fn project_row_metadata_from_input(
894    plan: LogicalPlan,
895    alias_generator: &AliasGenerator,
896) -> Result<LogicalPlan> {
897    use datafusion_expr::logical_plan::*;
898
899    // find all file metadata columns and collapse them into one concatenated list
900    match plan {
901        LogicalPlan::Projection(Projection { expr, input, .. }) => {
902            let file_md_columns = input
903                .schema()
904                .columns()
905                .into_iter()
906                .filter_map(|c| c.name.starts_with(META_COLUMN).then_some(Expr::Column(c)))
907                .collect_vec();
908            Projection::try_new(
909                expr.into_iter()
910                    .chain(Some(
911                        flatten(make_array(file_md_columns))
912                            .alias(alias_generator.next(META_COLUMN)),
913                    ))
914                    .collect_vec(),
915                input,
916            )
917            .map(LogicalPlan::Projection)
918        }
919        _ => {
920            let plan = plan.recompute_schema()?;
921            let (file_md_columns, original_columns) = plan
922                .schema()
923                .columns()
924                .into_iter()
925                .partition::<Vec<_>, _>(|c| c.name.starts_with(META_COLUMN));
926
927            Projection::try_new(
928                original_columns
929                    .into_iter()
930                    .map(Expr::Column)
931                    .chain(Some(
932                        flatten(make_array(
933                            file_md_columns.into_iter().map(Expr::Column).collect_vec(),
934                        ))
935                        .alias(alias_generator.next(META_COLUMN)),
936                    ))
937                    .collect_vec(),
938                Arc::new(plan),
939            )
940            .map(LogicalPlan::Projection)
941        }
942    }
943}
944
945/// Turn a TableScan into an equivalent scan on the row metadata source,
946/// assuming that every column in the table scan is a partition column;
947/// also adds a new column to the TableScan, "__meta"
948/// which is a List of Struct column including the row metadata.
949fn scan_columns_from_row_metadata(
950    scan: TableScan,
951    catalog_options: &CatalogOptions,
952    row_metadata_registry: &RowMetadataRegistry,
953) -> Result<LogicalPlan> {
954    let table_ref = scan.table_name.clone().resolve(
955        &catalog_options.default_catalog,
956        &catalog_options.default_schema,
957    );
958
959    let source = row_metadata_registry.get_source(&table_ref)?;
960
961    // [`RowMetadataSource`] returns a Struct,
962    // but the MV algorithm expects a list of structs at each node in the plan.
963    let mut exprs = scan
964        .projected_schema
965        .fields()
966        .iter()
967        .map(|f| col((None, f)))
968        .collect_vec();
969    exprs.push(make_array(vec![col(META_COLUMN)]).alias(META_COLUMN));
970
971    source
972        .row_metadata(table_ref, &scan)?
973        .project(exprs)?
974        .alias(scan.table_name.clone())?
975        .filter(
976            scan.filters
977                .clone()
978                .into_iter()
979                .fold(lit(true), |a, b| a.and(b)),
980        )?
981        .build()
982}
983
984/// Assemble sources irrespective of partitions
985/// This is more efficient when the materialized view has no partitions,
986/// but less intelligent -- it may return additional dependencies not present in the
987/// usual algorithm.
988//
989// TODO: see if we can optimize the normal logic for no partitions.
990// It seems that joins get transformed into cross joins, which can become extremely inefficient.
991// Hence we had to implement this alternate, simpler but less precise algorithm.
992// Notably, it may include more false positives.
993fn get_source_files_all_partitions(
994    materialized_view: &dyn Materialized,
995    catalog_options: &CatalogOptions,
996    row_metadata_registry: &RowMetadataRegistry,
997) -> Result<LogicalPlan> {
998    use datafusion_common::tree_node::TreeNodeRecursion;
999
1000    let mut tables = std::collections::HashMap::<TableReference, _>::new();
1001
1002    materialized_view
1003        .query()
1004        .apply(|plan| {
1005            if let LogicalPlan::TableScan(scan) = plan {
1006                tables.insert(scan.table_name.clone(), Arc::clone(&scan.source));
1007            }
1008
1009            Ok(TreeNodeRecursion::Continue)
1010        })
1011        .unwrap();
1012
1013    tables
1014        .into_iter()
1015        .try_fold(
1016            None::<LogicalPlanBuilder>,
1017            |maybe_plan, (table_ref, source)| {
1018                let resolved_ref = table_ref.clone().resolve(
1019                    &catalog_options.default_catalog,
1020                    &catalog_options.default_schema,
1021                );
1022
1023                let row_metadata = row_metadata_registry.get_source(&resolved_ref)?;
1024                let row_metadata_scan = row_metadata
1025                    .row_metadata(
1026                        resolved_ref,
1027                        &TableScan {
1028                            table_name: table_ref.clone(),
1029                            source,
1030                            projection: Some(vec![]), // no columns relevant
1031                            projected_schema: Arc::new(DFSchema::empty()),
1032                            filters: vec![],
1033                            fetch: None,
1034                        },
1035                    )?
1036                    .build()?;
1037
1038                if let Some(previous) = maybe_plan {
1039                    previous.union(row_metadata_scan)
1040                } else {
1041                    Ok(LogicalPlanBuilder::from(row_metadata_scan))
1042                }
1043                .map(Some)
1044            },
1045        )?
1046        .ok_or_else(|| DataFusionError::Plan("materialized view has no source tables".into()))?
1047        // [`RowMetadataSource`] returns a Struct,
1048        // but the MV algorithm expects a list of structs at each node in the plan.
1049        .project(vec![make_array(vec![col(META_COLUMN)]).alias(META_COLUMN)])?
1050        .build()
1051}
1052
1053#[cfg(test)]
1054mod test {
1055    use std::{any::Any, collections::HashSet, sync::Arc};
1056
1057    use arrow::util::pretty::pretty_format_batches;
1058    use arrow_schema::{DataType, Field, FieldRef, Fields, SchemaRef};
1059    use datafusion::{
1060        assert_batches_eq, assert_batches_sorted_eq,
1061        catalog::{Session, TableProvider},
1062        datasource::listing::ListingTableUrl,
1063        execution::session_state::SessionStateBuilder,
1064        prelude::{DataFrame, SessionConfig, SessionContext},
1065    };
1066    use datafusion_common::{Column, DFSchema, Result, ScalarValue};
1067    use datafusion_expr::builder::unnest;
1068    use datafusion_expr::{EmptyRelation, Expr, JoinType, LogicalPlan, TableType};
1069    use datafusion_physical_plan::ExecutionPlan;
1070    use itertools::Itertools;
1071
1072    use crate::materialized::{
1073        dependencies::pushdown_projection_inexact,
1074        register_decorator, register_materialized,
1075        row_metadata::{ObjectStoreRowMetadataSource, RowMetadataRegistry},
1076        Decorator, ListingTableLike, Materialized,
1077    };
1078
1079    use super::{mv_dependencies, stale_files};
1080
1081    /// A mock materialized view.
1082    #[derive(Debug)]
1083    struct MockMaterializedView {
1084        table_path: ListingTableUrl,
1085        partition_columns: Vec<String>,
1086        static_partition_columns: Option<Vec<String>>, // default = all partition columns
1087        query: LogicalPlan,
1088        file_ext: &'static str,
1089    }
1090
1091    #[async_trait::async_trait]
1092    impl TableProvider for MockMaterializedView {
1093        fn as_any(&self) -> &dyn Any {
1094            self
1095        }
1096
1097        fn schema(&self) -> SchemaRef {
1098            Arc::new(self.query.schema().as_arrow().clone())
1099        }
1100
1101        fn table_type(&self) -> TableType {
1102            TableType::Base
1103        }
1104
1105        async fn scan(
1106            &self,
1107            _state: &dyn Session,
1108            _projection: Option<&Vec<usize>>,
1109            _filters: &[Expr],
1110            _limit: Option<usize>,
1111        ) -> Result<Arc<dyn ExecutionPlan>> {
1112            unimplemented!()
1113        }
1114    }
1115
1116    impl ListingTableLike for MockMaterializedView {
1117        fn table_paths(&self) -> Vec<ListingTableUrl> {
1118            vec![self.table_path.clone()]
1119        }
1120
1121        fn partition_columns(&self) -> Vec<String> {
1122            self.partition_columns.clone()
1123        }
1124
1125        fn file_ext(&self) -> String {
1126            self.file_ext.to_string()
1127        }
1128    }
1129
1130    impl Materialized for MockMaterializedView {
1131        fn query(&self) -> LogicalPlan {
1132            self.query.clone()
1133        }
1134
1135        fn static_partition_columns(&self) -> Vec<String> {
1136            self.static_partition_columns
1137                .clone()
1138                .unwrap_or_else(|| self.partition_columns.clone())
1139        }
1140    }
1141
1142    #[derive(Debug)]
1143    struct DecoratorTable {
1144        inner: Arc<dyn TableProvider>,
1145    }
1146
1147    #[async_trait::async_trait]
1148    impl TableProvider for DecoratorTable {
1149        fn as_any(&self) -> &dyn Any {
1150            self
1151        }
1152
1153        fn schema(&self) -> SchemaRef {
1154            self.inner.schema()
1155        }
1156
1157        fn table_type(&self) -> TableType {
1158            self.inner.table_type()
1159        }
1160
1161        async fn scan(
1162            &self,
1163            state: &dyn Session,
1164            projection: Option<&Vec<usize>>,
1165            filters: &[Expr],
1166            limit: Option<usize>,
1167        ) -> Result<Arc<dyn ExecutionPlan>> {
1168            self.inner.scan(state, projection, filters, limit).await
1169        }
1170    }
1171
1172    impl Decorator for DecoratorTable {
1173        fn base(&self) -> &dyn TableProvider {
1174            self.inner.as_ref()
1175        }
1176    }
1177
1178    async fn setup() -> Result<SessionContext> {
1179        let _ = env_logger::builder().is_test(true).try_init();
1180
1181        register_materialized::<MockMaterializedView>();
1182        register_decorator::<DecoratorTable>();
1183
1184        let state = SessionStateBuilder::new()
1185            .with_default_features()
1186            .with_config(
1187                SessionConfig::new()
1188                    .with_default_catalog_and_schema("datafusion", "test")
1189                    .set(
1190                        "datafusion.explain.logical_plan_only",
1191                        &ScalarValue::Boolean(Some(true)),
1192                    )
1193                    .set(
1194                        "datafusion.sql_parser.dialect",
1195                        &ScalarValue::Utf8(Some("duckdb".into())),
1196                    )
1197                    .set(
1198                        // See discussion in this issue:
1199                        // https://github.com/apache/datafusion/issues/13065
1200                        "datafusion.execution.skip_physical_aggregate_schema_check",
1201                        &ScalarValue::Boolean(Some(true)),
1202                    ),
1203            )
1204            .build();
1205
1206        let ctx = SessionContext::new_with_state(state);
1207
1208        ctx.sql(
1209            "CREATE TABLE t1 AS VALUES
1210            ('2021', 3, 'A'),
1211            ('2022', 4, 'B'),
1212            ('2023', 5, 'C')",
1213        )
1214        .await?
1215        .collect()
1216        .await?;
1217
1218        ctx.sql(
1219            "CREATE TABLE t2 (
1220                year STRING,
1221                month STRING,
1222                day STRING,
1223                feed CHAR,
1224                column2 INTEGER
1225            ) AS VALUES
1226            ('2023', '01', '01', 'A', 1),
1227            ('2023', '01', '02', 'B', 2),
1228            ('2023', '01', '03', 'C', 3),
1229            ('2024', '12', '04', 'X', 4),
1230            ('2024', '12', '05', 'Y', 5),
1231            ('2024', '12', '06', 'Z', 6)",
1232        )
1233        .await?
1234        .collect()
1235        .await?;
1236
1237        ctx.sql(
1238            "CREATE TABLE t3 (
1239                year STRING,
1240                column1 INTEGER
1241            ) AS VALUES
1242            (2023, 1),
1243            (2024, 2)",
1244        )
1245        .await?
1246        .collect()
1247        .await?;
1248
1249        ctx.sql(
1250            // create a fake file metadata table to use as a mock
1251            "CREATE TABLE file_metadata (
1252                table_catalog STRING,
1253                table_schema STRING,
1254                table_name STRING,
1255                file_path STRING,
1256                last_modified TIMESTAMP,
1257                size BIGINT UNSIGNED
1258            ) AS VALUES
1259                ('datafusion', 'test', 't1', 's3://t1/column1=2021/data.01.parquet', '2023-07-11T16:29:26Z', 0),
1260                ('datafusion', 'test', 't1', 's3://t1/column1=2022/data.01.parquet', '2023-07-11T16:45:22Z', 0),
1261                ('datafusion', 'test', 't1', 's3://t1/column1=2023/data.01.parquet', '2023-07-11T16:45:44Z', 0),
1262                ('datafusion', 'test', 't2', 's3://t2/year=2023/month=01/day=01/feed=A/data.01.parquet', '2023-07-11T16:29:26Z', 0),
1263                ('datafusion', 'test', 't2', 's3://t2/year=2023/month=01/day=02/feed=B/data.01.parquet', '2023-07-11T16:45:22Z', 0),
1264                ('datafusion', 'test', 't2', 's3://t2/year=2023/month=01/day=03/feed=C/data.01.parquet', '2023-07-11T16:45:44Z', 0),
1265                ('datafusion', 'test', 't2', 's3://t2/year=2024/month=12/day=04/feed=X/data.01.parquet', '2023-07-11T16:29:26Z', 0),
1266                ('datafusion', 'test', 't2', 's3://t2/year=2024/month=12/day=05/feed=Y/data.01.parquet', '2023-07-11T16:45:22Z', 0),
1267                ('datafusion', 'test', 't2', 's3://t2/year=2024/month=12/day=06/feed=Z/data.01.parquet', '2023-07-11T16:45:44Z', 0),
1268                ('datafusion', 'test', 't3', 's3://t3/year=2023/data.01.parquet', '2023-07-11T16:45:44Z', 0),
1269                ('datafusion', 'test', 't3', 's3://t3/year=2024/data.01.parquet', '2023-07-11T16:45:44Z', 0)
1270            "
1271        )
1272        .await?
1273        .collect()
1274        .await?;
1275
1276        let metadata_table = ctx.table_provider("file_metadata").await?;
1277        let object_store_metadata_source = Arc::new(
1278            ObjectStoreRowMetadataSource::with_file_metadata(Arc::clone(&metadata_table)),
1279        );
1280
1281        let row_metadata_registry = Arc::new(RowMetadataRegistry::new_with_default_source(
1282            object_store_metadata_source,
1283        ));
1284
1285        ctx.register_udtf(
1286            "mv_dependencies",
1287            mv_dependencies(
1288                Arc::clone(ctx.state().catalog_list()),
1289                row_metadata_registry.clone(),
1290                ctx.copied_config().options(),
1291            ),
1292        );
1293
1294        ctx.register_udtf(
1295            "stale_files",
1296            stale_files(
1297                Arc::clone(ctx.state().catalog_list()),
1298                Arc::clone(&row_metadata_registry),
1299                metadata_table,
1300                ctx.copied_config().options(),
1301            ),
1302        );
1303
1304        Ok(ctx)
1305    }
1306
1307    #[tokio::test]
1308    async fn test_deps() {
1309        #[derive(Debug, Default)]
1310        struct TestCase {
1311            name: &'static str,
1312            query_to_analyze: &'static str,
1313            table_name: &'static str,
1314            table_path: &'static str,
1315            partition_cols: Vec<&'static str>,
1316            static_partition_cols: Option<Vec<&'static str>>,
1317            file_extension: &'static str,
1318            expected_output: Vec<&'static str>,
1319            file_metadata: &'static str,
1320            expected_stale_files_output: Vec<&'static str>,
1321        }
1322
1323        let cases = &[
1324            TestCase { name: "un-transformed partition column",
1325                query_to_analyze:
1326                    "SELECT column1 AS partition_column, concat(column2, column3) AS some_value FROM t1",
1327                table_name: "m1",
1328                table_path: "s3://m1/",
1329                partition_cols: vec!["partition_column"],
1330                file_extension: ".parquet",
1331                expected_output: vec![
1332                    "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+",
1333                    "| target                         | source_table_catalog | source_table_schema | source_table_name | source_uri                           | source_last_modified |",
1334                    "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+",
1335                    "| s3://m1/partition_column=2021/ | datafusion           | test                | t1                | s3://t1/column1=2021/data.01.parquet | 2023-07-11T16:29:26  |",
1336                    "| s3://m1/partition_column=2022/ | datafusion           | test                | t1                | s3://t1/column1=2022/data.01.parquet | 2023-07-11T16:45:22  |",
1337                    "| s3://m1/partition_column=2023/ | datafusion           | test                | t1                | s3://t1/column1=2023/data.01.parquet | 2023-07-11T16:45:44  |",
1338                    "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+",
1339                ],
1340                // second file is old
1341                file_metadata: "
1342                    ('datafusion', 'test', 'm1', 's3://m1/partition_column=2021/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1343                    ('datafusion', 'test', 'm1', 's3://m1/partition_column=2022/data.01.parquet', '2023-07-10T16:00:00Z', 0),
1344                    ('datafusion', 'test', 'm1', 's3://m1/partition_column=2023/data.01.parquet', '2023-07-12T16:00:00Z', 0)
1345                ",
1346                expected_stale_files_output: vec![
1347                    "+--------------------------------+----------------------+-----------------------+----------+",
1348                    "| target                         | target_last_modified | sources_last_modified | is_stale |",
1349                    "+--------------------------------+----------------------+-----------------------+----------+",
1350                    "| s3://m1/partition_column=2021/ | 2023-07-12T16:00:00  | 2023-07-11T16:29:26   | false    |",
1351                    "| s3://m1/partition_column=2022/ | 2023-07-10T16:00:00  | 2023-07-11T16:45:22   | true     |",
1352                    "| s3://m1/partition_column=2023/ | 2023-07-12T16:00:00  | 2023-07-11T16:45:44   | false    |",
1353                    "+--------------------------------+----------------------+-----------------------+----------+",
1354                ],
1355                ..Default::default()
1356            },
1357            TestCase { name: "omit internal metadata partition columns",
1358                query_to_analyze:
1359                    "SELECT column1 AS partition_column, concat(column2, column3) AS some_value FROM t1",
1360                table_name: "m1",
1361                table_path: "s3://m1/",
1362                partition_cols: vec!["partition_column"],
1363                file_extension: ".parquet",
1364                expected_output: vec![
1365                    "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+",
1366                    "| target                         | source_table_catalog | source_table_schema | source_table_name | source_uri                           | source_last_modified |",
1367                    "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+",
1368                    "| s3://m1/partition_column=2021/ | datafusion           | test                | t1                | s3://t1/column1=2021/data.01.parquet | 2023-07-11T16:29:26  |",
1369                    "| s3://m1/partition_column=2022/ | datafusion           | test                | t1                | s3://t1/column1=2022/data.01.parquet | 2023-07-11T16:45:22  |",
1370                    "| s3://m1/partition_column=2023/ | datafusion           | test                | t1                | s3://t1/column1=2023/data.01.parquet | 2023-07-11T16:45:44  |",
1371                    "+--------------------------------+----------------------+---------------------+-------------------+--------------------------------------+----------------------+",
1372                ],
1373                // second file is old
1374                file_metadata: "
1375                    ('datafusion', 'test', 'm1', 's3://m1/partition_column=2021/_v=123/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1376                    ('datafusion', 'test', 'm1', 's3://m1/partition_column=2022/_v=123/data.01.parquet', '2023-07-10T16:00:00Z', 0),
1377                    ('datafusion', 'test', 'm1', 's3://m1/partition_column=2023/_v=123/data.01.parquet', '2023-07-12T16:00:00Z', 0)
1378                ",
1379                expected_stale_files_output: vec![
1380                    "+--------------------------------+----------------------+-----------------------+----------+",
1381                    "| target                         | target_last_modified | sources_last_modified | is_stale |",
1382                    "+--------------------------------+----------------------+-----------------------+----------+",
1383                    "| s3://m1/partition_column=2021/ | 2023-07-12T16:00:00  | 2023-07-11T16:29:26   | false    |",
1384                    "| s3://m1/partition_column=2022/ | 2023-07-10T16:00:00  | 2023-07-11T16:45:22   | true     |",
1385                    "| s3://m1/partition_column=2023/ | 2023-07-12T16:00:00  | 2023-07-11T16:45:44   | false    |",
1386                    "+--------------------------------+----------------------+-----------------------+----------+",
1387                ],
1388                ..Default::default()
1389            },
1390            TestCase {
1391                name: "transform year/month/day partition into timestamp partition",
1392                query_to_analyze: "
1393                SELECT DISTINCT
1394                    to_timestamp_nanos(concat_ws('-', year, month, day)) AS timestamp,
1395                    feed
1396                FROM t2",
1397                table_name: "m2",
1398                table_path: "s3://m2/",
1399                partition_cols: vec!["timestamp", "feed"],
1400                file_extension: ".parquet",
1401                expected_output: vec![
1402                    "+-----------------------------------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1403                    "| target                                        | source_table_catalog | source_table_schema | source_table_name | source_uri                                               | source_last_modified |",
1404                    "+-----------------------------------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1405                    "| s3://m2/timestamp=2023-01-01T00:00:00/feed=A/ | datafusion           | test                | t2                | s3://t2/year=2023/month=01/day=01/feed=A/data.01.parquet | 2023-07-11T16:29:26  |",
1406                    "| s3://m2/timestamp=2023-01-02T00:00:00/feed=B/ | datafusion           | test                | t2                | s3://t2/year=2023/month=01/day=02/feed=B/data.01.parquet | 2023-07-11T16:45:22  |",
1407                    "| s3://m2/timestamp=2023-01-03T00:00:00/feed=C/ | datafusion           | test                | t2                | s3://t2/year=2023/month=01/day=03/feed=C/data.01.parquet | 2023-07-11T16:45:44  |",
1408                    "| s3://m2/timestamp=2024-12-04T00:00:00/feed=X/ | datafusion           | test                | t2                | s3://t2/year=2024/month=12/day=04/feed=X/data.01.parquet | 2023-07-11T16:29:26  |",
1409                    "| s3://m2/timestamp=2024-12-05T00:00:00/feed=Y/ | datafusion           | test                | t2                | s3://t2/year=2024/month=12/day=05/feed=Y/data.01.parquet | 2023-07-11T16:45:22  |",
1410                    "| s3://m2/timestamp=2024-12-06T00:00:00/feed=Z/ | datafusion           | test                | t2                | s3://t2/year=2024/month=12/day=06/feed=Z/data.01.parquet | 2023-07-11T16:45:44  |",
1411                    "+-----------------------------------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1412                ],
1413                file_metadata: "
1414                    ('datafusion', 'test', 'm2', 's3://m2/timestamp=2023-01-01T00:00:00/feed=A/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1415                    ('datafusion', 'test', 'm2', 's3://m2/timestamp=2023-01-02T00:00:00/feed=B/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1416                    ('datafusion', 'test', 'm2', 's3://m2/timestamp=2023-01-03T00:00:00/feed=C/data.01.parquet', '2023-07-10T16:00:00Z', 0),
1417                    ('datafusion', 'test', 'm2', 's3://m2/timestamp=2024-12-04T00:00:00/feed=X/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1418                    ('datafusion', 'test', 'm2', 's3://m2/timestamp=2024-12-05T00:00:00/feed=Y/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1419                    ('datafusion', 'test', 'm2', 's3://m2/timestamp=2024-12-06T00:00:00/feed=Z/data.01.parquet', '2023-07-10T16:00:00Z', 0)
1420                ",
1421                expected_stale_files_output: vec![
1422                    "+-----------------------------------------------+----------------------+-----------------------+----------+",
1423                    "| target                                        | target_last_modified | sources_last_modified | is_stale |",
1424                    "+-----------------------------------------------+----------------------+-----------------------+----------+",
1425                    "| s3://m2/timestamp=2023-01-01T00:00:00/feed=A/ | 2023-07-12T16:00:00  | 2023-07-11T16:29:26   | false    |",
1426                    "| s3://m2/timestamp=2023-01-02T00:00:00/feed=B/ | 2023-07-12T16:00:00  | 2023-07-11T16:45:22   | false    |",
1427                    "| s3://m2/timestamp=2023-01-03T00:00:00/feed=C/ | 2023-07-10T16:00:00  | 2023-07-11T16:45:44   | true     |",
1428                    "| s3://m2/timestamp=2024-12-04T00:00:00/feed=X/ | 2023-07-12T16:00:00  | 2023-07-11T16:29:26   | false    |",
1429                    "| s3://m2/timestamp=2024-12-05T00:00:00/feed=Y/ | 2023-07-12T16:00:00  | 2023-07-11T16:45:22   | false    |",
1430                    "| s3://m2/timestamp=2024-12-06T00:00:00/feed=Z/ | 2023-07-10T16:00:00  | 2023-07-11T16:45:44   | true     |",
1431                    "+-----------------------------------------------+----------------------+-----------------------+----------+",
1432                ],
1433                ..Default::default()
1434            },
1435            TestCase {
1436                name: "omit dynamic partition columns",
1437                query_to_analyze: "
1438                SELECT
1439                    year,
1440                    month,
1441                    day,
1442                    column2,
1443                    COUNT(*) AS ct
1444                FROM t2
1445                GROUP BY year, month, day, column2
1446                ",
1447                table_name: "m_dynamic",
1448                table_path: "s3://m_dynamic/",
1449                partition_cols: vec!["year", "month", "day", "column2"],
1450                static_partition_cols: Some(vec!["year", "month", "day"]),
1451                file_extension: ".parquet",
1452                expected_output: vec![
1453                    "+-------------------------------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1454                    "| target                                    | source_table_catalog | source_table_schema | source_table_name | source_uri                                               | source_last_modified |",
1455                    "+-------------------------------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1456                    "| s3://m_dynamic/year=2023/month=01/day=01/ | datafusion           | test                | t2                | s3://t2/year=2023/month=01/day=01/feed=A/data.01.parquet | 2023-07-11T16:29:26  |",
1457                    "| s3://m_dynamic/year=2023/month=01/day=02/ | datafusion           | test                | t2                | s3://t2/year=2023/month=01/day=02/feed=B/data.01.parquet | 2023-07-11T16:45:22  |",
1458                    "| s3://m_dynamic/year=2023/month=01/day=03/ | datafusion           | test                | t2                | s3://t2/year=2023/month=01/day=03/feed=C/data.01.parquet | 2023-07-11T16:45:44  |",
1459                    "| s3://m_dynamic/year=2024/month=12/day=04/ | datafusion           | test                | t2                | s3://t2/year=2024/month=12/day=04/feed=X/data.01.parquet | 2023-07-11T16:29:26  |",
1460                    "| s3://m_dynamic/year=2024/month=12/day=05/ | datafusion           | test                | t2                | s3://t2/year=2024/month=12/day=05/feed=Y/data.01.parquet | 2023-07-11T16:45:22  |",
1461                    "| s3://m_dynamic/year=2024/month=12/day=06/ | datafusion           | test                | t2                | s3://t2/year=2024/month=12/day=06/feed=Z/data.01.parquet | 2023-07-11T16:45:44  |",
1462                    "+-------------------------------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1463                ],
1464                file_metadata: "
1465                    ('datafusion', 'test', 'm_dynamic', 's3://m_dynamic/year=2023/month=01/day=01/column2=1/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1466                    ('datafusion', 'test', 'm_dynamic', 's3://m_dynamic/year=2023/month=01/day=02/column2=2/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1467                    ('datafusion', 'test', 'm_dynamic', 's3://m_dynamic/year=2023/month=01/day=03/column2=3/data.01.parquet', '2023-07-10T16:00:00Z', 0),
1468                    ('datafusion', 'test', 'm_dynamic', 's3://m_dynamic/year=2024/month=12/day=04/column2=4/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1469                    ('datafusion', 'test', 'm_dynamic', 's3://m_dynamic/year=2024/month=12/day=05/column2=5/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1470                    ('datafusion', 'test', 'm_dynamic', 's3://m_dynamic/year=2024/month=12/day=06/column2=6/data.01.parquet', '2023-07-10T16:00:00Z', 0)
1471                ",
1472                expected_stale_files_output: vec![
1473                    "+-------------------------------------------+----------------------+-----------------------+----------+",
1474                    "| target                                    | target_last_modified | sources_last_modified | is_stale |",
1475                    "+-------------------------------------------+----------------------+-----------------------+----------+",
1476                    "| s3://m_dynamic/year=2023/month=01/day=01/ | 2023-07-12T16:00:00  | 2023-07-11T16:29:26   | false    |",
1477                    "| s3://m_dynamic/year=2023/month=01/day=02/ | 2023-07-12T16:00:00  | 2023-07-11T16:45:22   | false    |",
1478                    "| s3://m_dynamic/year=2023/month=01/day=03/ | 2023-07-10T16:00:00  | 2023-07-11T16:45:44   | true     |",
1479                    "| s3://m_dynamic/year=2024/month=12/day=04/ | 2023-07-12T16:00:00  | 2023-07-11T16:29:26   | false    |",
1480                    "| s3://m_dynamic/year=2024/month=12/day=05/ | 2023-07-12T16:00:00  | 2023-07-11T16:45:22   | false    |",
1481                    "| s3://m_dynamic/year=2024/month=12/day=06/ | 2023-07-10T16:00:00  | 2023-07-11T16:45:44   | true     |",
1482                    "+-------------------------------------------+----------------------+-----------------------+----------+",
1483                ],
1484            },
1485            TestCase {
1486                name: "materialized view has no partitions",
1487                query_to_analyze: "SELECT column1 AS output FROM t3",
1488                table_name: "m3",
1489                table_path: "s3://m3/",
1490                partition_cols: vec![],
1491                file_extension: ".parquet",
1492                expected_output: vec![
1493                    "+----------+----------------------+---------------------+-------------------+-----------------------------------+----------------------+",
1494                    "| target   | source_table_catalog | source_table_schema | source_table_name | source_uri                        | source_last_modified |",
1495                    "+----------+----------------------+---------------------+-------------------+-----------------------------------+----------------------+",
1496                    "| s3://m3/ | datafusion           | test                | t3                | s3://t3/year=2023/data.01.parquet | 2023-07-11T16:45:44  |",
1497                    "| s3://m3/ | datafusion           | test                | t3                | s3://t3/year=2024/data.01.parquet | 2023-07-11T16:45:44  |",
1498                    "+----------+----------------------+---------------------+-------------------+-----------------------------------+----------------------+",
1499                ],
1500                file_metadata: "
1501                    ('datafusion', 'test', 'm3', 's3://m3/data.01.parquet', '2023-07-12T16:00:00Z', 0)
1502                ",
1503                expected_stale_files_output: vec![
1504                    "+----------+----------------------+-----------------------+----------+",
1505                    "| target   | target_last_modified | sources_last_modified | is_stale |",
1506                    "+----------+----------------------+-----------------------+----------+",
1507                    "| s3://m3/ | 2023-07-12T16:00:00  | 2023-07-11T16:45:44   | false    |",
1508                    "+----------+----------------------+-----------------------+----------+",
1509                ],
1510                ..Default::default()
1511            },
1512            TestCase {
1513                name: "simple equijoin on year",
1514                query_to_analyze: "SELECT * FROM t2 INNER JOIN t3 USING (year)",
1515                table_name: "m4",
1516                table_path: "s3://m4/",
1517                partition_cols: vec!["year"],
1518                file_extension: ".parquet",
1519                expected_output: vec![
1520                    "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1521                    "| target             | source_table_catalog | source_table_schema | source_table_name | source_uri                                               | source_last_modified |",
1522                    "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1523                    "| s3://m4/year=2023/ | datafusion           | test                | t2                | s3://t2/year=2023/month=01/day=01/feed=A/data.01.parquet | 2023-07-11T16:29:26  |",
1524                    "| s3://m4/year=2023/ | datafusion           | test                | t2                | s3://t2/year=2023/month=01/day=02/feed=B/data.01.parquet | 2023-07-11T16:45:22  |",
1525                    "| s3://m4/year=2023/ | datafusion           | test                | t2                | s3://t2/year=2023/month=01/day=03/feed=C/data.01.parquet | 2023-07-11T16:45:44  |",
1526                    "| s3://m4/year=2023/ | datafusion           | test                | t3                | s3://t3/year=2023/data.01.parquet                        | 2023-07-11T16:45:44  |",
1527                    "| s3://m4/year=2024/ | datafusion           | test                | t2                | s3://t2/year=2024/month=12/day=04/feed=X/data.01.parquet | 2023-07-11T16:29:26  |",
1528                    "| s3://m4/year=2024/ | datafusion           | test                | t2                | s3://t2/year=2024/month=12/day=05/feed=Y/data.01.parquet | 2023-07-11T16:45:22  |",
1529                    "| s3://m4/year=2024/ | datafusion           | test                | t2                | s3://t2/year=2024/month=12/day=06/feed=Z/data.01.parquet | 2023-07-11T16:45:44  |",
1530                    "| s3://m4/year=2024/ | datafusion           | test                | t3                | s3://t3/year=2024/data.01.parquet                        | 2023-07-11T16:45:44  |",
1531                    "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1532                ],
1533                file_metadata: "
1534                    ('datafusion', 'test', 'm4', 's3://m4/year=2023/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1535                    ('datafusion', 'test', 'm4', 's3://m4/year=2024/data.01.parquet', '2023-07-12T16:00:00Z', 0)
1536                ",
1537                expected_stale_files_output: vec![
1538                    "+--------------------+----------------------+-----------------------+----------+",
1539                    "| target             | target_last_modified | sources_last_modified | is_stale |",
1540                    "+--------------------+----------------------+-----------------------+----------+",
1541                    "| s3://m4/year=2023/ | 2023-07-12T16:00:00  | 2023-07-11T16:45:44   | false    |",
1542                    "| s3://m4/year=2024/ | 2023-07-12T16:00:00  | 2023-07-11T16:45:44   | false    |",
1543                    "+--------------------+----------------------+-----------------------+----------+",
1544                ],
1545                ..Default::default()
1546            },
1547            TestCase {
1548                name: "triangular join on year",
1549                query_to_analyze: "
1550                    SELECT
1551                        t2.*,
1552                        t3.* EXCLUDE(year),
1553                        t3.year AS \"t3.year\"
1554                    FROM t2
1555                    INNER JOIN t3
1556                    ON (t2.year <= t3.year)",
1557                table_name: "m4",
1558                table_path: "s3://m4/",
1559                partition_cols: vec!["year"],
1560                file_extension: ".parquet",
1561                expected_output: vec![
1562                    "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1563                    "| target             | source_table_catalog | source_table_schema | source_table_name | source_uri                                               | source_last_modified |",
1564                    "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1565                    "| s3://m4/year=2023/ | datafusion           | test                | t2                | s3://t2/year=2023/month=01/day=01/feed=A/data.01.parquet | 2023-07-11T16:29:26  |",
1566                    "| s3://m4/year=2023/ | datafusion           | test                | t2                | s3://t2/year=2023/month=01/day=02/feed=B/data.01.parquet | 2023-07-11T16:45:22  |",
1567                    "| s3://m4/year=2023/ | datafusion           | test                | t2                | s3://t2/year=2023/month=01/day=03/feed=C/data.01.parquet | 2023-07-11T16:45:44  |",
1568                    "| s3://m4/year=2023/ | datafusion           | test                | t3                | s3://t3/year=2023/data.01.parquet                        | 2023-07-11T16:45:44  |",
1569                    "| s3://m4/year=2023/ | datafusion           | test                | t3                | s3://t3/year=2024/data.01.parquet                        | 2023-07-11T16:45:44  |",
1570                    "| s3://m4/year=2024/ | datafusion           | test                | t2                | s3://t2/year=2024/month=12/day=04/feed=X/data.01.parquet | 2023-07-11T16:29:26  |",
1571                    "| s3://m4/year=2024/ | datafusion           | test                | t2                | s3://t2/year=2024/month=12/day=05/feed=Y/data.01.parquet | 2023-07-11T16:45:22  |",
1572                    "| s3://m4/year=2024/ | datafusion           | test                | t2                | s3://t2/year=2024/month=12/day=06/feed=Z/data.01.parquet | 2023-07-11T16:45:44  |",
1573                    "| s3://m4/year=2024/ | datafusion           | test                | t3                | s3://t3/year=2024/data.01.parquet                        | 2023-07-11T16:45:44  |",
1574                    "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1575                ],
1576                file_metadata: "
1577                    ('datafusion', 'test', 'm4', 's3://m4/year=2023/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1578                    ('datafusion', 'test', 'm4', 's3://m4/year=2024/data.01.parquet', '2023-07-12T16:00:00Z', 0)
1579                ",
1580                expected_stale_files_output: vec![
1581                    "+--------------------+----------------------+-----------------------+----------+",
1582                    "| target             | target_last_modified | sources_last_modified | is_stale |",
1583                    "+--------------------+----------------------+-----------------------+----------+",
1584                    "| s3://m4/year=2023/ | 2023-07-12T16:00:00  | 2023-07-11T16:45:44   | false    |",
1585                    "| s3://m4/year=2024/ | 2023-07-12T16:00:00  | 2023-07-11T16:45:44   | false    |",
1586                    "+--------------------+----------------------+-----------------------+----------+",
1587                ],
1588                ..Default::default()
1589            },
1590            TestCase {
1591                name: "triangular left join, strict <",
1592                query_to_analyze: "
1593                    SELECT
1594                        t2.*,
1595                        t3.* EXCLUDE(year),
1596                        t3.year AS \"t3.year\"
1597                    FROM t2
1598                    LEFT JOIN t3
1599                    ON (t2.year < t3.year)",
1600                table_name: "m4",
1601                table_path: "s3://m4/",
1602                partition_cols: vec!["year"],
1603                file_extension: ".parquet",
1604                expected_output: vec![
1605                    "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1606                    "| target             | source_table_catalog | source_table_schema | source_table_name | source_uri                                               | source_last_modified |",
1607                    "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1608                    "| s3://m4/year=2023/ | datafusion           | test                | t2                | s3://t2/year=2023/month=01/day=01/feed=A/data.01.parquet | 2023-07-11T16:29:26  |",
1609                    "| s3://m4/year=2023/ | datafusion           | test                | t2                | s3://t2/year=2023/month=01/day=02/feed=B/data.01.parquet | 2023-07-11T16:45:22  |",
1610                    "| s3://m4/year=2023/ | datafusion           | test                | t2                | s3://t2/year=2023/month=01/day=03/feed=C/data.01.parquet | 2023-07-11T16:45:44  |",
1611                    "| s3://m4/year=2023/ | datafusion           | test                | t3                | s3://t3/year=2024/data.01.parquet                        | 2023-07-11T16:45:44  |",
1612                    "| s3://m4/year=2024/ | datafusion           | test                | t2                | s3://t2/year=2024/month=12/day=04/feed=X/data.01.parquet | 2023-07-11T16:29:26  |",
1613                    "| s3://m4/year=2024/ | datafusion           | test                | t2                | s3://t2/year=2024/month=12/day=05/feed=Y/data.01.parquet | 2023-07-11T16:45:22  |",
1614                    "| s3://m4/year=2024/ | datafusion           | test                | t2                | s3://t2/year=2024/month=12/day=06/feed=Z/data.01.parquet | 2023-07-11T16:45:44  |",
1615                    "+--------------------+----------------------+---------------------+-------------------+----------------------------------------------------------+----------------------+",
1616                ],
1617                file_metadata: "
1618                    ('datafusion', 'test', 'm4', 's3://m4/year=2023/data.01.parquet', '2023-07-12T16:00:00Z', 0),
1619                    ('datafusion', 'test', 'm4', 's3://m4/year=2024/data.01.parquet', '2023-07-12T16:00:00Z', 0)
1620                ",
1621                expected_stale_files_output: vec![
1622                    "+--------------------+----------------------+-----------------------+----------+",
1623                    "| target             | target_last_modified | sources_last_modified | is_stale |",
1624                    "+--------------------+----------------------+-----------------------+----------+",
1625                    "| s3://m4/year=2023/ | 2023-07-12T16:00:00  | 2023-07-11T16:45:44   | false    |",
1626                    "| s3://m4/year=2024/ | 2023-07-12T16:00:00  | 2023-07-11T16:45:44   | false    |",
1627                    "+--------------------+----------------------+-----------------------+----------+",
1628                ],
1629                ..Default::default()
1630            },
1631        ];
1632
1633        async fn run_test(case: &TestCase) -> Result<()> {
1634            let context = setup().await.unwrap();
1635
1636            let plan = context
1637                .sql(case.query_to_analyze)
1638                .await?
1639                .into_optimized_plan()?;
1640
1641            println!("original plan: \n{}", plan.display_indent());
1642
1643            let partition_col_indices = plan
1644                .schema()
1645                .columns()
1646                .into_iter()
1647                .enumerate()
1648                .filter_map(|(i, c)| case.partition_cols.contains(&c.name.as_str()).then_some(i))
1649                .collect();
1650            println!("indices: {partition_col_indices:?}");
1651            let analyzed = pushdown_projection_inexact(plan.clone(), &partition_col_indices)?;
1652            println!(
1653                "inexact projection pushdown:\n{}",
1654                analyzed.display_indent()
1655            );
1656
1657            context
1658                .register_table(
1659                    case.table_name,
1660                    // Register table with a decorator to exercise this functionality
1661                    Arc::new(DecoratorTable {
1662                        inner: Arc::new(MockMaterializedView {
1663                            table_path: ListingTableUrl::parse(case.table_path).unwrap(),
1664                            partition_columns: case
1665                                .partition_cols
1666                                .iter()
1667                                .map(|s| s.to_string())
1668                                .collect(),
1669                            static_partition_columns: case
1670                                .static_partition_cols
1671                                .as_ref()
1672                                .map(|list| list.iter().map(|s| s.to_string()).collect()),
1673                            query: plan,
1674                            file_ext: case.file_extension,
1675                        }),
1676                    }),
1677                )
1678                .expect("couldn't register materialized view");
1679
1680            context
1681                .sql(&format!(
1682                    "INSERT INTO file_metadata VALUES {}",
1683                    case.file_metadata,
1684                ))
1685                .await?
1686                .collect()
1687                .await?;
1688
1689            context
1690                .sql(&format!(
1691                    "SELECT * FROM file_metadata WHERE table_name = '{}'",
1692                    case.table_name
1693                ))
1694                .await?
1695                .show()
1696                .await?;
1697
1698            let df = context
1699                .sql(&format!(
1700                    "SELECT * FROM mv_dependencies('{}', 'v2')",
1701                    case.table_name,
1702                ))
1703                .await
1704                .map_err(|e| e.context("get file dependencies"))?;
1705            df.clone().explain(false, false)?.show().await?;
1706            df.clone().show().await?;
1707
1708            assert_batches_sorted_eq!(case.expected_output, &df.collect().await?);
1709
1710            let df = context
1711                .sql(&format!(
1712                    "SELECT * FROM stale_files('{}', 'v2')",
1713                    case.table_name
1714                ))
1715                .await
1716                .map_err(|e| e.context("get stale files"))?;
1717            df.clone().explain(false, false)?.show().await?;
1718            df.clone().show().await?;
1719
1720            assert_batches_sorted_eq!(case.expected_stale_files_output, &df.collect().await?);
1721
1722            Ok(())
1723        }
1724
1725        for case in cases {
1726            run_test(case)
1727                .await
1728                .unwrap_or_else(|e| panic!("{} failed: {e}", case.name));
1729        }
1730    }
1731
1732    #[tokio::test]
1733    async fn test_projection_pushdown_inexact() -> Result<()> {
1734        struct TestCase {
1735            name: &'static str,
1736            query_to_analyze: &'static str,
1737            projection: &'static [&'static str],
1738            expected_plan: Vec<&'static str>,
1739            expected_output: Vec<&'static str>,
1740        }
1741
1742        let cases = &[
1743            TestCase {
1744                name: "simple projection",
1745                query_to_analyze:
1746                    "SELECT column1 AS partition_column, concat(column2, column3) AS some_value FROM t1",
1747                projection: &["partition_column"],
1748                expected_plan: vec![
1749                    "+--------------+--------------------------------------------+",
1750                    "| plan_type    | plan                                       |",
1751                    "+--------------+--------------------------------------------+",
1752                    "| logical_plan | Projection: t1.column1 AS partition_column |",
1753                    "|              |   TableScan: t1 projection=[column1]       |",
1754                    "+--------------+--------------------------------------------+",
1755                ],
1756                expected_output: vec![
1757                    "+------------------+",
1758                    "| partition_column |",
1759                    "+------------------+",
1760                    "| 2021             |",
1761                    "| 2022             |",
1762                    "| 2023             |",
1763                    "+------------------+",
1764                ],
1765            },
1766            TestCase {
1767                name: "compound expressions",
1768                query_to_analyze: "
1769                    SELECT DISTINCT
1770                        to_timestamp_nanos(concat_ws('-', year, month, day)) AS timestamp,
1771                        feed
1772                    FROM t2",
1773                projection: &["timestamp", "feed"],
1774                expected_plan: vec![
1775                    "+--------------+-------------------------------------------------------------------------------------------------------+",
1776                    "| plan_type    | plan                                                                                                  |",
1777                    "+--------------+-------------------------------------------------------------------------------------------------------+",
1778                    "| logical_plan | Projection: to_timestamp_nanos(concat_ws(Utf8(\"-\"), t2.year, t2.month, t2.day)) AS timestamp, t2.feed |",
1779                    "|              |   TableScan: t2 projection=[year, month, day, feed]                                                   |",
1780                    "+--------------+-------------------------------------------------------------------------------------------------------+",
1781                ]
1782                ,
1783                expected_output: vec![
1784                    "+---------------------+------+",
1785                    "| timestamp           | feed |",
1786                    "+---------------------+------+",
1787                    "| 2023-01-01T00:00:00 | A    |",
1788                    "| 2023-01-02T00:00:00 | B    |",
1789                    "| 2023-01-03T00:00:00 | C    |",
1790                    "| 2024-12-04T00:00:00 | X    |",
1791                    "| 2024-12-05T00:00:00 | Y    |",
1792                    "| 2024-12-06T00:00:00 | Z    |",
1793                    "+---------------------+------+",
1794                ],
1795            },
1796            TestCase {
1797                name: "empty projection",
1798                query_to_analyze: "SELECT column1 AS output FROM t3",
1799                projection: &[],
1800                expected_plan: vec![
1801                    "+--------------+-----------------------------+",
1802                    "| plan_type    | plan                        |",
1803                    "+--------------+-----------------------------+",
1804                    "| logical_plan | TableScan: t3 projection=[] |",
1805                    "+--------------+-----------------------------+",
1806                ],
1807                expected_output: vec![
1808                    "++",
1809                    "++",
1810                    "++",
1811                ],
1812            },
1813            TestCase {
1814                name: "simple equijoin on year",
1815                query_to_analyze: "SELECT * FROM t2 INNER JOIN t3 USING (year)",
1816                projection: &["year"],
1817                expected_plan: vec![
1818                    "+--------------+-------------------------------------+",
1819                    "| plan_type    | plan                                |",
1820                    "+--------------+-------------------------------------+",
1821                    "| logical_plan | Projection: t2.year                 |",
1822                    "|              |   Inner Join: t2.year = t3.year     |",
1823                    "|              |     TableScan: t2 projection=[year] |",
1824                    "|              |     TableScan: t3 projection=[year] |",
1825                    "+--------------+-------------------------------------+",
1826                ],
1827                expected_output: vec![
1828                    "+------+",
1829                    "| year |",
1830                    "+------+",
1831                    "| 2023 |",
1832                    "| 2023 |",
1833                    "| 2023 |",
1834                    "| 2024 |",
1835                    "| 2024 |",
1836                    "| 2024 |",
1837                    "+------+",
1838                ],
1839            },
1840            TestCase {
1841                name: "triangular join on year",
1842                query_to_analyze: "
1843                    SELECT
1844                        t2.*,
1845                        t3.* EXCLUDE(year),
1846                        t3.year AS \"t3.year\"
1847                    FROM t2
1848                    INNER JOIN t3
1849                    ON (t2.year <= t3.year)",
1850                projection: &["year"],
1851                expected_plan: vec![
1852                    "+--------------+-------------------------------------------+",
1853                    "| plan_type    | plan                                      |",
1854                    "+--------------+-------------------------------------------+",
1855                    "| logical_plan | Projection: t2.year                       |",
1856                    "|              |   Inner Join:  Filter: t2.year <= t3.year |",
1857                    "|              |     TableScan: t2 projection=[year]       |",
1858                    "|              |     TableScan: t3 projection=[year]       |",
1859                    "+--------------+-------------------------------------------+",
1860                ],
1861                expected_output: vec![
1862                    "+------+",
1863                    "| year |",
1864                    "+------+",
1865                    "| 2023 |",
1866                    "| 2023 |",
1867                    "| 2023 |",
1868                    "| 2023 |",
1869                    "| 2023 |",
1870                    "| 2023 |",
1871                    "| 2024 |",
1872                    "| 2024 |",
1873                    "| 2024 |",
1874                    "+------+",
1875                ],
1876            },
1877            TestCase {
1878                name: "window & unnest",
1879                query_to_analyze: "
1880                SELECT
1881                    \"__unnest_placeholder(date).year\" AS year,
1882                    \"__unnest_placeholder(date).month\" AS month,
1883                    \"__unnest_placeholder(date).day\" AS day,
1884                    arr
1885                FROM (
1886                    SELECT
1887                        unnest(date),
1888                        unnest(arr) AS arr
1889                    FROM (
1890                        SELECT
1891                            named_struct('year', year, 'month', month, 'day', day) AS date,
1892                            array_agg(column2)
1893                                OVER (ORDER BY year, month, day)
1894                                AS arr
1895                        FROM t2
1896                    )
1897                )",
1898                projection: &["year", "month", "day"],
1899                expected_plan: vec![
1900                    "+--------------+---------------------------------------------------------------------------------------------------------------------------------------+",
1901                    "| plan_type    | plan                                                                                                                                  |",
1902                    "+--------------+---------------------------------------------------------------------------------------------------------------------------------------+",
1903                    "| logical_plan | Projection: __unnest_placeholder(date).year AS year, __unnest_placeholder(date).month AS month, __unnest_placeholder(date).day AS day |",
1904                    "|              |   Unnest: lists[] structs[__unnest_placeholder(date)]                                                                                 |",
1905                    "|              |     Projection: named_struct(Utf8(\"year\"), t2.year, Utf8(\"month\"), t2.month, Utf8(\"day\"), t2.day) AS __unnest_placeholder(date)       |",
1906                    "|              |       TableScan: t2 projection=[year, month, day]                                                                                     |",
1907                    "+--------------+---------------------------------------------------------------------------------------------------------------------------------------+",
1908                ],
1909                expected_output: vec![
1910                    "+------+-------+-----+",
1911                    "| year | month | day |",
1912                    "+------+-------+-----+",
1913                    "| 2023 | 01    | 01  |",
1914                    "| 2023 | 01    | 02  |",
1915                    "| 2023 | 01    | 03  |",
1916                    "| 2024 | 12    | 04  |",
1917                    "| 2024 | 12    | 05  |",
1918                    "| 2024 | 12    | 06  |",
1919                    "+------+-------+-----+",
1920                ],
1921            },
1922            TestCase {
1923                name: "outer join + union",
1924                query_to_analyze: "
1925                SELECT
1926                    COALESCE(t1.year, t2.year) AS year,
1927                    t1.column2
1928                FROM (SELECT column1 AS year, column2 FROM t1) t1
1929                FULL OUTER JOIN (SELECT year, column2 FROM t2) t2
1930                USING (year)
1931                UNION ALL
1932                SELECT year, column1 AS column2 FROM t3
1933                ",
1934                projection: &["year"],
1935                expected_plan: vec![
1936                    "+--------------+--------------------------------------------------------------------+",
1937                    "| plan_type    | plan                                                               |",
1938                    "+--------------+--------------------------------------------------------------------+",
1939                    "| logical_plan | Union                                                              |",
1940                    "|              |   Projection: coalesce(CAST(t1.year AS Utf8View), t2.year) AS year |",
1941                    "|              |     Full Join: Using CAST(t1.year AS Utf8View) = t2.year           |",
1942                    "|              |       SubqueryAlias: t1                                            |",
1943                    "|              |         Projection: t1.column1 AS year                             |",
1944                    "|              |           TableScan: t1 projection=[column1]                       |",
1945                    "|              |       SubqueryAlias: t2                                            |",
1946                    "|              |         TableScan: t2 projection=[year]                            |",
1947                    "|              |   TableScan: t3 projection=[year]                                  |",
1948                    "+--------------+--------------------------------------------------------------------+",
1949                ],
1950                expected_output: vec![
1951                    "+------+",
1952                    "| year |",
1953                    "+------+",
1954                    "| 2021 |",
1955                    "| 2022 |",
1956                    "| 2023 |",
1957                    "| 2023 |",
1958                    "| 2023 |",
1959                    "| 2023 |",
1960                    "| 2024 |",
1961                    "| 2024 |",
1962                    "| 2024 |",
1963                    "| 2024 |",
1964                    "+------+",
1965                ],
1966            }
1967        ];
1968
1969        async fn run_test(case: &TestCase) -> Result<()> {
1970            let context = setup().await?;
1971
1972            let df = context.sql(case.query_to_analyze).await?;
1973            df.clone().explain(false, false)?.show().await?;
1974
1975            let plan = df.clone().into_optimized_plan()?;
1976
1977            let indices = case
1978                .projection
1979                .iter()
1980                .map(|&name| {
1981                    plan.schema()
1982                        .index_of_column(&Column::new_unqualified(name))
1983                })
1984                .collect::<Result<HashSet<_>>>()?;
1985
1986            let analyzed = DataFrame::new(
1987                context.state(),
1988                pushdown_projection_inexact(plan.clone(), &indices)?,
1989            );
1990            analyzed.clone().explain(false, false)?.show().await?;
1991
1992            // Check the following property of pushdown_projection_inexact:
1993            // if A' = pushdown_projection_inexact(A, P), where P is the projection,
1994            // then PA ⊆ A'.
1995            if !case.projection.is_empty() {
1996                let select_original = df
1997                    .clone()
1998                    .select(
1999                        case.projection
2000                            .iter()
2001                            .map(|&name| Expr::Column(Column::new_unqualified(name)))
2002                            .collect_vec(),
2003                    )
2004                    .map_err(|e| e.context("select projection from original plan"))?
2005                    .distinct()?;
2006
2007                let excess = analyzed
2008                    .clone()
2009                    .distinct()?
2010                    .join(
2011                        select_original.clone(),
2012                        JoinType::RightAnti,
2013                        case.projection,
2014                        case.projection,
2015                        None,
2016                    )
2017                    .map_err(|e| e.context("join in subset inclusion test"))?;
2018
2019                assert_eq!(
2020                    excess
2021                        .clone()
2022                        .count()
2023                        .await
2024                        .map_err(|e| e.context("execute subset inclusion test"))?,
2025                    0,
2026                    "unexpected extra rows in transformed query:\n{}
2027                            original:\n{}
2028                            inexact pushdown:\n{}
2029                            ",
2030                    pretty_format_batches(&excess.collect().await?)?,
2031                    pretty_format_batches(&select_original.collect().await?)?,
2032                    pretty_format_batches(&analyzed.clone().distinct()?.collect().await?)?
2033                );
2034            }
2035
2036            assert_batches_eq!(
2037                case.expected_plan,
2038                &analyzed.clone().explain(false, false)?.collect().await?
2039            );
2040            assert_batches_sorted_eq!(case.expected_output, &analyzed.collect().await?);
2041
2042            Ok(())
2043        }
2044
2045        for case in cases {
2046            run_test(case)
2047                .await
2048                .unwrap_or_else(|e| panic!("{} failed: {e}", case.name));
2049        }
2050
2051        Ok(())
2052    }
2053
2054    #[test]
2055    fn test_pushdown_unnest_guard_partition_date_only() -> Result<()> {
2056        // This test simulates a simplified MV scenario:
2057        //
2058        // WITH events_structs AS (
2059        //   SELECT id, date, unnest(events) AS evs
2060        //   FROM base_table
2061        // ),
2062        // flattened_events AS (
2063        //   SELECT id, date, evs.event_type, evs.event_time
2064        //   FROM events_structs
2065        // ),
2066        // SELECT id, date, MAX(...) ...
2067        // GROUP BY id, date
2068        //
2069        // The partition column is "date". During dependency plan
2070        // building we only request "date" from this subtree,
2071        // so pushdown_projection_inexact receives indices for
2072        // the `date` column only. The guard must kick in:
2073        // unnest(events) becomes unused, and the plan should
2074        // collapse to just projecting `date` from the child.
2075
2076        // 1. Build schema for base table
2077        let id = Field::new("id", DataType::Utf8, true);
2078        let date = Field::new("date", DataType::Utf8, true);
2079
2080        // events: list<struct<event_type, event_time>>
2081        let event_type = Field::new("event_type", DataType::Utf8, true);
2082        let event_time = Field::new("event_time", DataType::Utf8, true);
2083        let events_struct = Field::new(
2084            "item",
2085            DataType::Struct(Fields::from(vec![event_type, event_time])),
2086            true,
2087        );
2088        let events = Field::new(
2089            "events",
2090            DataType::List(FieldRef::from(Box::new(events_struct))),
2091            true,
2092        );
2093
2094        // Build DFSchema: (id, date, events)
2095        let qualified_fields = vec![
2096            (None, Arc::new(id.clone())),
2097            (None, Arc::new(date.clone())),
2098            (None, Arc::new(events.clone())),
2099        ];
2100        let df_schema =
2101            DFSchema::new_with_metadata(qualified_fields, std::collections::HashMap::new())?;
2102
2103        // 2. Build a dummy child plan (EmptyRelation with the schema)
2104        let empty = LogicalPlan::EmptyRelation(EmptyRelation {
2105            produce_one_row: false,
2106            schema: Arc::new(df_schema),
2107        });
2108
2109        // 3. Wrap it with an Unnest node on the "events" column
2110        let events_col = Column::from_name("events");
2111        let unnest_plan = unnest(empty.clone(), vec![events_col.clone()])?;
2112
2113        // 4. Partition column is "date". Look up its actual index dynamically.
2114        let date_idx = unnest_plan
2115            .schema()
2116            .index_of_column(&Column::from_name("date"))?;
2117        let mut indices: HashSet<usize> = HashSet::new();
2118        indices.insert(date_idx);
2119
2120        // 5. Call pushdown_projection_inexact with {date}
2121        let res = pushdown_projection_inexact(unnest_plan, &indices)?;
2122
2123        // 6. Assert the result schema only contains `date`
2124        let cols: Vec<String> = res
2125            .schema()
2126            .fields()
2127            .iter()
2128            .map(|f| f.name().to_string())
2129            .collect();
2130
2131        assert_eq!(cols, vec!["date"]);
2132
2133        Ok(())
2134    }
2135}