datafusion_materialized_views/materialized/row_metadata.rs
1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements. See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership. The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License. You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied. See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18use dashmap::DashMap;
19use datafusion::catalog::TableProvider;
20use datafusion_common::{DataFusionError, Result};
21use datafusion_expr::{LogicalPlanBuilder, TableScan};
22use datafusion_sql::ResolvedTableReference;
23use std::{collections::BTreeMap, sync::Arc};
24
25use super::{file_metadata::FileMetadata, hive_partition::hive_partition, META_COLUMN};
26
27/// Registry that manages metadata sources for different tables.
28/// Provides a centralized way to register and retrieve metadata sources
29/// that can be used to obtain row-level metadata for tables.
30pub struct RowMetadataRegistry {
31 metadata_sources: DashMap<String, Arc<dyn RowMetadataSource>>,
32 default_source: Option<Arc<dyn RowMetadataSource>>,
33}
34
35impl std::fmt::Debug for RowMetadataRegistry {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 f.debug_struct("RowMetadataRegistry")
38 .field(
39 "metadata_sources",
40 &self
41 .metadata_sources
42 .iter()
43 .map(|r| (r.key().clone(), r.value().name().to_string()))
44 .collect::<BTreeMap<_, _>>(),
45 )
46 .field(
47 "default_source",
48 &self.default_source.as_ref().map(|s| s.name()),
49 )
50 .finish()
51 }
52}
53
54impl RowMetadataRegistry {
55 /// Initializes this `RowMetadataRegistry` with a [`FileMetadata`] as the default
56 /// row metadata source.
57 ///
58 /// This should be the typical entrypoint for most usecases.
59 pub fn new(file_metadata: Arc<FileMetadata>) -> Self {
60 Self::new_with_default_source(Arc::new(ObjectStoreRowMetadataSource::new(file_metadata))
61 as Arc<dyn RowMetadataSource + 'static>)
62 }
63
64 /// Initializes this `RowMetadataRegistry` with a default [`RowMetadataSource`]
65 /// to be used if a table has not been explicitly registered with a specific source.
66 ///
67 /// Typically the [`FileMetadata`] source should be used as the default.
68 pub fn new_with_default_source(default_source: Arc<dyn RowMetadataSource>) -> Self {
69 Self {
70 metadata_sources: Default::default(),
71 default_source: Some(default_source),
72 }
73 }
74
75 /// Initializes a new `RowMetadataRegistry` with no default [`RowMetadataSource`].
76 ///
77 /// Users should typically use [`RowMetadataRegistry::new_with_default_source`].
78 pub fn new_empty() -> Self {
79 Self {
80 metadata_sources: Default::default(),
81 default_source: None,
82 }
83 }
84
85 /// Registers a metadata source for a specific table.
86 /// Returns the previously registered source for this table, if any
87 pub fn register_source(
88 &self,
89 table: &ResolvedTableReference,
90 source: Arc<dyn RowMetadataSource>,
91 ) -> Option<Arc<dyn RowMetadataSource>> {
92 self.metadata_sources.insert(table.to_string(), source)
93 }
94
95 /// Retrieves the registered [`RowMetadataSource`] for a specific table.
96 pub fn get_source(&self, table: &ResolvedTableReference) -> Result<Arc<dyn RowMetadataSource>> {
97 self.metadata_sources
98 .get(&table.to_string())
99 .map(|o| Arc::clone(o.value()))
100 .or_else(|| self.default_source.clone())
101 .ok_or_else(|| DataFusionError::Internal(format!("No metadata source for {table}")))
102 }
103}
104
105/// A source for "row metadata", that associates rows from a table with
106/// metadata used for incremental view maintenance.
107///
108/// Most use cases should default to using [`FileMetadata`] for their [`RowMetadataSource`],
109/// which uses object store metadata to perform incremental view maintenance on Hive-partitioned tables.
110/// However, in some use cases it is necessary to track metadata at a more granular level than Hive partitions.
111/// In such cases, users may implement a custom [`RowMetadataSource`] containing this metadata.
112///
113/// A [`RowMetadataSource`] may contain metadata for multiple tables.
114/// As such, it is the user's responsibility to register each table with the appropriate
115/// [`RowMetadataSource`] in the [`RowMetadataRegistry`].
116pub trait RowMetadataSource: Send + Sync {
117 /// The name of this row metadata source.
118 fn name(&self) -> &str;
119
120 /// Rewrite this [`TableScan`] as query against this [`RowMetadataSource`],
121 /// this time adding a new struct column, `__meta`, whose shape conforms to the following schema:
122 ///
123 /// ```json
124 /// {
125 /// "table_catalog": "string",
126 /// "table_schema": "string",
127 /// "table_name": "string",
128 /// "source_uri": "string",
129 /// "last_modified": "timestamp",
130 /// }
131 /// ```
132 ///
133 /// The returned data should contain the original table scan, up to multiplicity.
134 /// That is, for each row in the original table scan, the [`RowMetadataSource`] should contain at least
135 /// one row (but potentially more) with the same values, plus the `__meta` column.
136 fn row_metadata(
137 &self,
138 table: ResolvedTableReference,
139 scan: &TableScan,
140 ) -> Result<LogicalPlanBuilder>;
141}
142
143/// A [`RowMetadataSource`] that uses an object storage API to retrieve
144/// partition columns and timestamp metadata.
145///
146/// Object store metadata by default comes from [`FileMetadata`], but
147/// may be overridden with a custom [`TableProvider`] using
148/// [`Self::with_file_metadata`].
149#[derive(Debug, Clone)]
150pub struct ObjectStoreRowMetadataSource {
151 file_metadata: Arc<dyn TableProvider>,
152}
153
154impl ObjectStoreRowMetadataSource {
155 /// Create a new [`ObjectStoreRowMetadataSource`] from the [`FileMetadata`] table
156 pub fn new(file_metadata: Arc<FileMetadata>) -> Self {
157 Self::with_file_metadata(file_metadata)
158 }
159
160 /// Create a new [`ObjectStoreRowMetadataSource`] using a custom file metadata source
161 pub fn with_file_metadata(file_metadata: Arc<dyn TableProvider>) -> Self {
162 Self { file_metadata }
163 }
164}
165
166impl RowMetadataSource for ObjectStoreRowMetadataSource {
167 fn name(&self) -> &str {
168 "ObjectStoreRowMetadataSource"
169 }
170
171 /// Scan for partition column values using object store metadata.
172 /// This allows us to efficiently scan for distinct partition column values without
173 /// ever reading from a table directly, which is useful for low-overhead
174 /// incremental view maintenance.
175 fn row_metadata(
176 &self,
177 table: datafusion_sql::ResolvedTableReference,
178 scan: &datafusion_expr::TableScan,
179 ) -> Result<datafusion_expr::LogicalPlanBuilder> {
180 use datafusion::{datasource::provider_as_source, prelude::*};
181
182 // Disable this check in tests
183 #[cfg(not(test))]
184 {
185 // Check that the remaining columns in the source table scans are indeed partition columns
186 let partition_cols = super::cast_to_listing_table(
187 datafusion::datasource::source_as_provider(&scan.source)?.as_ref(),
188 )
189 .ok_or_else(|| {
190 DataFusionError::Internal(format!(
191 "Table '{}' was not registered in TableTypeRegistry",
192 scan.table_name
193 ))
194 })?
195 .partition_columns();
196
197 for column in scan.projected_schema.columns() {
198 if !partition_cols.contains(&column.name) {
199 return Err(DataFusionError::Internal(format!("Row metadata not available on non-partition column from source table '{table}': {}", column.name)));
200 }
201 }
202 }
203
204 let fields = scan.projected_schema.fields();
205
206 let row_metadata_expr = named_struct(vec![
207 lit("table_catalog"),
208 col("table_catalog"),
209 lit("table_schema"),
210 col("table_schema"),
211 lit("table_name"),
212 col("table_name"),
213 lit("source_uri"), // Map file_path to source_uri
214 col("file_path"),
215 lit("last_modified"),
216 col("last_modified"),
217 ])
218 .alias(META_COLUMN);
219
220 datafusion_expr::LogicalPlanBuilder::scan(
221 "file_metadata",
222 provider_as_source(Arc::clone(&self.file_metadata)),
223 None,
224 )?
225 .filter(
226 col("table_catalog")
227 .eq(lit(table.catalog.as_ref()))
228 .and(col("table_schema").eq(lit(table.schema.as_ref())))
229 .and(col("table_name").eq(lit(table.table.as_ref()))),
230 )?
231 .project(
232 fields
233 .iter()
234 .map(|field| {
235 // CAST(hive_partition(file_path, 'field_name', true) AS field_data_type) AS field_name
236 cast(
237 hive_partition(vec![col("file_path"), lit(field.name()), lit(true)]),
238 field.data_type().clone(),
239 )
240 .alias(field.name())
241 })
242 .chain(Some(row_metadata_expr)),
243 )
244 }
245}