datafusion_materialized_views/materialized/
row_metadata.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use dashmap::DashMap;
19use datafusion::catalog::TableProvider;
20use datafusion_common::{DataFusionError, Result};
21use datafusion_expr::{LogicalPlanBuilder, TableScan};
22use datafusion_sql::ResolvedTableReference;
23use std::{collections::BTreeMap, sync::Arc};
24
25use super::{file_metadata::FileMetadata, hive_partition::hive_partition, META_COLUMN};
26
27/// Registry that manages metadata sources for different tables.
28/// Provides a centralized way to register and retrieve metadata sources
29/// that can be used to obtain row-level metadata for tables.
30pub struct RowMetadataRegistry {
31    metadata_sources: DashMap<String, Arc<dyn RowMetadataSource>>,
32    default_source: Option<Arc<dyn RowMetadataSource>>,
33}
34
35impl std::fmt::Debug for RowMetadataRegistry {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        f.debug_struct("RowMetadataRegistry")
38            .field(
39                "metadata_sources",
40                &self
41                    .metadata_sources
42                    .iter()
43                    .map(|r| (r.key().clone(), r.value().name().to_string()))
44                    .collect::<BTreeMap<_, _>>(),
45            )
46            .field(
47                "default_source",
48                &self.default_source.as_ref().map(|s| s.name()),
49            )
50            .finish()
51    }
52}
53
54impl RowMetadataRegistry {
55    /// Initializes this `RowMetadataRegistry` with a [`FileMetadata`] as the default
56    /// row metadata source.
57    ///
58    /// This should be the typical entrypoint for most usecases.
59    pub fn new(file_metadata: Arc<FileMetadata>) -> Self {
60        Self::new_with_default_source(Arc::new(ObjectStoreRowMetadataSource::new(file_metadata))
61            as Arc<dyn RowMetadataSource + 'static>)
62    }
63
64    /// Initializes this `RowMetadataRegistry` with a default [`RowMetadataSource`]
65    /// to be used if a table has not been explicitly registered with a specific source.
66    ///
67    /// Typically the [`FileMetadata`] source should be used as the default.
68    pub fn new_with_default_source(default_source: Arc<dyn RowMetadataSource>) -> Self {
69        Self {
70            metadata_sources: Default::default(),
71            default_source: Some(default_source),
72        }
73    }
74
75    /// Initializes a new `RowMetadataRegistry` with no default [`RowMetadataSource`].
76    ///
77    /// Users should typically use [`RowMetadataRegistry::new_with_default_source`].
78    pub fn new_empty() -> Self {
79        Self {
80            metadata_sources: Default::default(),
81            default_source: None,
82        }
83    }
84
85    /// Registers a metadata source for a specific table.
86    /// Returns the previously registered source for this table, if any
87    pub fn register_source(
88        &self,
89        table: &ResolvedTableReference,
90        source: Arc<dyn RowMetadataSource>,
91    ) -> Option<Arc<dyn RowMetadataSource>> {
92        self.metadata_sources.insert(table.to_string(), source)
93    }
94
95    /// Retrieves the registered [`RowMetadataSource`] for a specific table.
96    pub fn get_source(&self, table: &ResolvedTableReference) -> Result<Arc<dyn RowMetadataSource>> {
97        self.metadata_sources
98            .get(&table.to_string())
99            .map(|o| Arc::clone(o.value()))
100            .or_else(|| self.default_source.clone())
101            .ok_or_else(|| DataFusionError::Internal(format!("No metadata source for {}", table)))
102    }
103}
104
105/// A source for "row metadata", that associates rows from a table with
106/// metadata used for incremental view maintenance.
107///
108/// Most use cases should default to using [`FileMetadata`] for their [`RowMetadataSource`],
109/// which uses object store metadata to perform incremental view maintenance on Hive-partitioned tables.
110/// However, in some use cases it is necessary to track metadata at a more granular level than Hive partitions.
111/// In such cases, users may implement a custom [`RowMetadataSource`] containing this metadata.
112///
113/// A [`RowMetadataSource`] may contain metadata for multiple tables.
114/// As such, it is the user's responsibility to register each table with the appropriate
115/// [`RowMetadataSource`] in the [`RowMetadataRegistry`].
116pub trait RowMetadataSource: Send + Sync {
117    /// The name of this row metadata source.
118    fn name(&self) -> &str;
119
120    /// Rewrite this [`TableScan`] as query against this [`RowMetadataSource`],
121    /// this time adding a new struct column, `__meta`, whose shape conforms to the following schema:
122    ///
123    /// ```json
124    /// {
125    ///     "table_catalog": "string",
126    ///     "table_schema": "string",
127    ///     "table_name": "string",
128    ///     "source_uri": "string",
129    ///     "last_modified": "timestamp",
130    /// }
131    /// ```
132    ///
133    /// The returned data should contain the original table scan, up to multiplicity.
134    /// That is, for each row in the original table scan, the [`RowMetadataSource`] should contain at least
135    /// one row (but potentially more) with the same values, plus the `__meta` column.
136    fn row_metadata(
137        &self,
138        table: ResolvedTableReference,
139        scan: &TableScan,
140    ) -> Result<LogicalPlanBuilder>;
141}
142
143/// A [`RowMetadataSource`] that uses an object storage API to retrieve
144/// partition columns and timestamp metadata.
145///
146/// Object store metadata by default comes from [`FileMetadata`], but
147/// may be overridden with a custom [`TableProvider`] using
148/// [`Self::with_file_metadata`].
149#[derive(Debug, Clone)]
150pub struct ObjectStoreRowMetadataSource {
151    file_metadata: Arc<dyn TableProvider>,
152}
153
154impl ObjectStoreRowMetadataSource {
155    /// Create a new [`ObjectStoreRowMetadataSource`] from the [`FileMetadata`] table
156    pub fn new(file_metadata: Arc<FileMetadata>) -> Self {
157        Self::with_file_metadata(file_metadata)
158    }
159
160    /// Create a new [`ObjectStoreRowMetadataSource`] using a custom file metadata source
161    pub fn with_file_metadata(file_metadata: Arc<dyn TableProvider>) -> Self {
162        Self { file_metadata }
163    }
164}
165
166impl RowMetadataSource for ObjectStoreRowMetadataSource {
167    fn name(&self) -> &str {
168        "ObjectStoreRowMetadataSource"
169    }
170
171    /// Scan for partition column values using object store metadata.
172    /// This allows us to efficiently scan for distinct partition column values without
173    /// ever reading from a table directly, which is useful for low-overhead
174    /// incremental view maintenance.
175    fn row_metadata(
176        &self,
177        table: datafusion_sql::ResolvedTableReference,
178        scan: &datafusion_expr::TableScan,
179    ) -> Result<datafusion_expr::LogicalPlanBuilder> {
180        use datafusion::{datasource::provider_as_source, prelude::*};
181
182        // Disable this check in tests
183        #[cfg(not(test))]
184        {
185            // Check that the remaining columns in the source table scans are indeed partition columns
186            let partition_cols = super::cast_to_listing_table(
187                datafusion::datasource::source_as_provider(&scan.source)?.as_ref(),
188            )
189            .ok_or_else(|| {
190                DataFusionError::Internal(format!(
191                    "Table '{}' was not registered in TableTypeRegistry",
192                    scan.table_name
193                ))
194            })?
195            .partition_columns();
196
197            for column in scan.projected_schema.columns() {
198                if !partition_cols.contains(&column.name) {
199                    return Err(DataFusionError::Internal(format!("Row metadata not available on non-partition column from source table '{table}': {}", column.name)));
200                }
201            }
202        }
203
204        let fields = scan.projected_schema.fields();
205
206        let row_metadata_expr = named_struct(vec![
207            lit("table_catalog"),
208            col("table_catalog"),
209            lit("table_schema"),
210            col("table_schema"),
211            lit("table_name"),
212            col("table_name"),
213            lit("source_uri"), // Map file_path to source_uri
214            col("file_path"),
215            lit("last_modified"),
216            col("last_modified"),
217        ])
218        .alias(META_COLUMN);
219
220        datafusion_expr::LogicalPlanBuilder::scan(
221            "file_metadata",
222            provider_as_source(Arc::clone(&self.file_metadata)),
223            None,
224        )?
225        .filter(
226            col("table_catalog")
227                .eq(lit(table.catalog.as_ref()))
228                .and(col("table_schema").eq(lit(table.schema.as_ref())))
229                .and(col("table_name").eq(lit(table.table.as_ref()))),
230        )?
231        .project(
232            fields
233                .iter()
234                .map(|field| {
235                    // CAST(hive_partition(file_path, 'field_name', true) AS field_data_type) AS field_name
236                    cast(
237                        hive_partition(vec![col("file_path"), lit(field.name()), lit(true)]),
238                        field.data_type().clone(),
239                    )
240                    .alias(field.name())
241                })
242                .chain(Some(row_metadata_expr)),
243        )
244    }
245}