datafusion_materialized_views/materialized/row_metadata.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
use dashmap::DashMap;
use datafusion::catalog::TableProvider;
use datafusion_common::{DataFusionError, Result};
use datafusion_expr::{LogicalPlanBuilder, TableScan};
use datafusion_sql::ResolvedTableReference;
use std::{collections::BTreeMap, sync::Arc};
use super::{file_metadata::FileMetadata, hive_partition::hive_partition, META_COLUMN};
/// Registry that manages metadata sources for different tables.
/// Provides a centralized way to register and retrieve metadata sources
/// that can be used to obtain row-level metadata for tables.
pub struct RowMetadataRegistry {
metadata_sources: DashMap<String, Arc<dyn RowMetadataSource>>,
default_source: Option<Arc<dyn RowMetadataSource>>,
}
impl std::fmt::Debug for RowMetadataRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("RowMetadataRegistry")
.field(
"metadata_sources",
&self
.metadata_sources
.iter()
.map(|r| (r.key().clone(), r.value().name().to_string()))
.collect::<BTreeMap<_, _>>(),
)
.finish()
}
}
impl RowMetadataRegistry {
/// Initializes this `RowMetadataRegistry` with a default [`RowMetadataSource`]
/// to be used if a table has not been explicitly registered with a specific source.
///
/// Typically the [`FileMetadata`] source should be used as the default.
pub fn new_with_default_source(default_source: Arc<dyn RowMetadataSource>) -> Self {
Self {
metadata_sources: Default::default(),
default_source: Some(default_source),
}
}
/// Initializes a new `RowMetadataRegistry` with no default [`RowMetadataSource`].
///
/// Users should typically use [`RowMetadataRegistry::new_with_default_source`].
pub fn new_empty() -> Self {
Self {
metadata_sources: Default::default(),
default_source: None,
}
}
/// Registers a metadata source for a specific table.
/// Returns the previously registered source for this table, if any
pub fn register_source(
&self,
table: &ResolvedTableReference,
source: Arc<dyn RowMetadataSource>,
) -> Option<Arc<dyn RowMetadataSource>> {
self.metadata_sources.insert(table.to_string(), source)
}
/// Retrieves the registered [`RowMetadataSource`] for a specific table.
pub fn get_source(&self, table: &ResolvedTableReference) -> Result<Arc<dyn RowMetadataSource>> {
self.metadata_sources
.get(&table.to_string())
.map(|o| Arc::clone(o.value()))
.or_else(|| self.default_source.clone())
.ok_or_else(|| DataFusionError::Internal(format!("No metadata source for {}", table)))
}
}
/// A source for "row metadata", that associates rows from a table with
/// metadata used for incremental view maintenance.
///
/// Most use cases should default to using [`FileMetadata`] for their [`RowMetadataSource`],
/// which uses object store metadata to perform incremental view maintenance on Hive-partitioned tables.
/// However, in some use cases it is necessary to track metadata at a more granular level than Hive partitions.
/// In such cases, users may implement a custom [`RowMetadataSource`] containing this metadata.
///
/// A [`RowMetadataSource`] may contain metadata for multiple tables.
/// As such, it is the user's responsibility to register each table with the appropriate
/// [`RowMetadataSource`] in the [`RowMetadataRegistry`].
pub trait RowMetadataSource: Send + Sync {
/// The name of this row metadata source.
fn name(&self) -> &str;
/// Rewrite this [`TableScan`] as query against this [`RowMetadataSource`],
/// this time adding a new struct column, `__meta`, whose shape conforms to the following schema:
///
/// ```json
/// {
/// "table_catalog": "string",
/// "table_schema": "string",
/// "table_name": "string",
/// "source_uri": "string",
/// "last_modified": "timestamp",
/// }
/// ```
///
/// The returned data should contain the original table scan, up to multiplicity.
/// That is, for each row in the original table scan, the [`RowMetadataSource`] should contain at least
/// one row (but potentially more) with the same values, plus the `__meta` column.
fn row_metadata(
&self,
table: ResolvedTableReference,
scan: &TableScan,
) -> Result<LogicalPlanBuilder>;
}
/// A [`RowMetadataSource`] that uses an object storage API to retrieve
/// partition columns and timestamp metadata.
///
/// Object store metadata by default comes from [`FileMetadata`], but
/// may be overridden with a custom [`TableProvider`] using
/// [`Self::with_file_metadata`].
#[derive(Debug, Clone)]
pub struct ObjectStoreRowMetadataSource {
file_metadata: Arc<dyn TableProvider>,
}
impl ObjectStoreRowMetadataSource {
/// Create a new [`ObjectStoreRowMetadataSource`] from the [`FileMetadata`] table
pub fn new(file_metadata: Arc<FileMetadata>) -> Self {
Self::with_file_metadata(file_metadata)
}
/// Create a new [`ObjectStoreRowMetadataSource`] using a custom file metadata source
pub fn with_file_metadata(file_metadata: Arc<dyn TableProvider>) -> Self {
Self { file_metadata }
}
}
impl RowMetadataSource for ObjectStoreRowMetadataSource {
fn name(&self) -> &str {
"ObjectStoreRowMetadataSource"
}
/// Scan for partition column values using object store metadata.
/// This allows us to efficiently scan for distinct partition column values without
/// ever reading from a table directly, which is useful for low-overhead
/// incremental view maintenance.
fn row_metadata(
&self,
table: datafusion_sql::ResolvedTableReference,
scan: &datafusion_expr::TableScan,
) -> Result<datafusion_expr::LogicalPlanBuilder> {
use datafusion::{datasource::provider_as_source, prelude::*};
// Disable this check in tests
#[cfg(not(test))]
{
// Check that the remaining columns in the source table scans are indeed partition columns
let partition_cols = super::cast_to_listing_table(
datafusion::datasource::source_as_provider(&scan.source)?.as_ref(),
)
.ok_or_else(|| {
DataFusionError::Internal(format!(
"Table '{}' was not registered in TableTypeRegistry",
scan.table_name
))
})?
.partition_columns();
for column in scan.projected_schema.columns() {
if !partition_cols.contains(&column.name) {
return Err(DataFusionError::Internal(format!("Row metadata not available on non-partition column from source table '{table}': {}", column.name)));
}
}
}
let fields = scan.projected_schema.fields();
let row_metadata_expr = named_struct(vec![
lit("table_catalog"),
col("table_catalog"),
lit("table_schema"),
col("table_schema"),
lit("table_name"),
col("table_name"),
lit("source_uri"), // Map file_path to source_uri
col("file_path"),
lit("last_modified"),
col("last_modified"),
])
.alias(META_COLUMN);
datafusion_expr::LogicalPlanBuilder::scan(
"file_metadata",
provider_as_source(Arc::clone(&self.file_metadata)),
None,
)?
.filter(
col("table_catalog")
.eq(lit(table.catalog.as_ref()))
.and(col("table_schema").eq(lit(table.schema.as_ref())))
.and(col("table_name").eq(lit(table.table.as_ref()))),
)?
.project(
fields
.iter()
.map(|field| {
// CAST(hive_partition(file_path, 'field_name', true) AS field_data_type) AS field_name
cast(
hive_partition(vec![col("file_path"), lit(field.name()), lit(true)]),
field.data_type().clone(),
)
.alias(field.name())
})
.chain(Some(row_metadata_expr)),
)
}
}