datafusion_materialized_views/
materialized.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18pub mod dependencies;
19
20/// Pluggable metadata sources for incremental view maintenance
21pub mod row_metadata;
22
23/// A virtual table that exposes files in object storage.
24pub mod file_metadata;
25
26/// A UDF that parses Hive partition elements from object storage paths.
27mod hive_partition;
28
29/// Some private utility functions
30mod util;
31
32use std::{
33    any::{type_name, Any, TypeId},
34    fmt::Debug,
35    sync::{Arc, LazyLock},
36};
37
38use dashmap::DashMap;
39use datafusion::{
40    catalog::TableProvider,
41    datasource::listing::{ListingTable, ListingTableUrl},
42};
43use datafusion_common::DataFusionError;
44use datafusion_expr::LogicalPlan;
45use itertools::Itertools;
46
47use crate::MaterializedConfig;
48
49/// The identifier of the column that [`RowMetadataSource`](row_metadata::RowMetadataSource) implementations should store row metadata in.
50pub const META_COLUMN: &str = "__meta";
51
52static TABLE_TYPE_REGISTRY: LazyLock<TableTypeRegistry> = LazyLock::new(TableTypeRegistry::default);
53
54/// A [`TableProvider`] whose data is backed by Hive-partitioned files in object storage.
55pub trait ListingTableLike: TableProvider + 'static {
56    /// Object store URLs for this table
57    fn table_paths(&self) -> Vec<ListingTableUrl>;
58
59    /// Hive partition columns
60    fn partition_columns(&self) -> Vec<String>;
61
62    /// File extension used by this listing table
63    fn file_ext(&self) -> String;
64}
65
66impl ListingTableLike for ListingTable {
67    fn table_paths(&self) -> Vec<ListingTableUrl> {
68        self.table_paths().clone()
69    }
70
71    fn partition_columns(&self) -> Vec<String> {
72        self.options()
73            .table_partition_cols
74            .iter()
75            .map(|(name, _data_type)| name.clone())
76            .collect_vec()
77    }
78
79    fn file_ext(&self) -> String {
80        self.options().file_extension.clone()
81    }
82}
83
84/// Register a [`ListingTableLike`] implementation in this registry.
85/// This allows `cast_to_listing_table` to easily downcast a [`TableProvider`]
86/// into a `ListingTableLike` where possible.
87pub fn register_listing_table<T: ListingTableLike>() {
88    TABLE_TYPE_REGISTRY.register_listing_table::<T>();
89}
90
91/// Attempt to cast the given TableProvider into a [`ListingTableLike`].
92/// If the table's type has not been registered using [`register_listing_table`], will return `None`.
93pub fn cast_to_listing_table(table: &dyn TableProvider) -> Option<&dyn ListingTableLike> {
94    TABLE_TYPE_REGISTRY
95        .cast_to_listing_table(table)
96        .or_else(|| {
97            TABLE_TYPE_REGISTRY
98                .cast_to_decorator(table)
99                .and_then(|decorator| cast_to_listing_table(decorator.base()))
100        })
101}
102
103/// A hive-partitioned table in object storage that is defined by a user-provided query.
104pub trait Materialized: ListingTableLike {
105    /// The query that defines this materialized view.
106    fn query(&self) -> LogicalPlan;
107
108    /// Configuration to control materialized view related features.
109    /// By default, returns the default value for [`MaterializedConfig`]
110    fn config(&self) -> MaterializedConfig {
111        MaterializedConfig::default()
112    }
113
114    /// Which partition columns are 'static'.
115    /// Static partition columns are those that are used in incremental view maintenance.
116    /// These should be a prefix of the full set of partition columns returned by [`ListingTableLike::partition_columns`].
117    /// The rest of the partition columns are 'dynamic' and their values will be generated at runtime during incremental refresh.
118    fn static_partition_columns(&self) -> Vec<String> {
119        <Self as ListingTableLike>::partition_columns(self)
120    }
121}
122
123/// Register a [`Materialized`] implementation in this registry.
124/// This allows `cast_to_materialized` to easily downcast a [`TableProvider`]
125/// into a `Materialized` where possible.
126///
127/// Note that this will also register `T` as a [`ListingTableLike`].
128pub fn register_materialized<T: Materialized>() {
129    TABLE_TYPE_REGISTRY.register_materialized::<T>();
130}
131
132/// Attempt to cast the given TableProvider into a [`Materialized`].
133/// If the table's type has not been registered using [`register_materialized`], will return `Ok(None)`.
134///
135/// Does a runtime check on some invariants of `Materialized` and returns an error if they are violated.
136/// In particular, checks that the static partition columns are a prefix of all partition columns.
137pub fn cast_to_materialized(
138    table: &dyn TableProvider,
139) -> Result<Option<&dyn Materialized>, DataFusionError> {
140    let materialized = match TABLE_TYPE_REGISTRY
141        .cast_to_materialized(table)
142        .map(Ok)
143        .or_else(|| {
144            TABLE_TYPE_REGISTRY
145                .cast_to_decorator(table)
146                .and_then(|decorator| cast_to_materialized(decorator.base()).transpose())
147        })
148        .transpose()?
149    {
150        None => return Ok(None),
151        Some(m) => m,
152    };
153
154    let static_partition_cols = materialized.static_partition_columns();
155    let all_partition_cols = materialized.partition_columns();
156
157    if materialized.partition_columns()[..static_partition_cols.len()] != static_partition_cols[..]
158    {
159        return Err(DataFusionError::Plan(format!(
160            "Materialized view's static partition columns ({static_partition_cols:?}) must be a prefix of all partition columns ({all_partition_cols:?})"
161        )));
162    }
163
164    Ok(Some(materialized))
165}
166
167/// A `TableProvider` that decorates other `TableProvider`s.
168/// Sometimes users may implement a `TableProvider` that overrides functionality of a base `TableProvider`.
169/// This API allows the decorator to also be recognized as `ListingTableLike` or `Materialized` automatically.
170pub trait Decorator: TableProvider + 'static {
171    /// The underlying `TableProvider` that this decorator wraps.
172    fn base(&self) -> &dyn TableProvider;
173}
174
175/// Register `T` as a [`Decorator`].
176pub fn register_decorator<T: Decorator>() {
177    TABLE_TYPE_REGISTRY.register_decorator::<T>()
178}
179
180type Downcaster<T> = Arc<dyn Fn(&dyn Any) -> Option<&T> + Send + Sync>;
181
182/// A registry for implementations of library-defined traits, used for downcasting
183/// arbitrary TableProviders into `ListingTableLike` and `Materialized` trait objects where possible.
184///
185/// This is used throughout the crate as a singleton to store all known implementations of `ListingTableLike` and `Materialized`.
186/// By default, [`ListingTable`] is registered as a `ListingTableLike`.
187struct TableTypeRegistry {
188    listing_table_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn ListingTableLike>)>,
189    materialized_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn Materialized>)>,
190    decorator_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn Decorator>)>,
191}
192
193impl Debug for TableTypeRegistry {
194    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195        f.debug_struct("TableTypeRegistry")
196            .field(
197                "listing_table_accessors",
198                &self
199                    .listing_table_accessors
200                    .iter()
201                    .map(|r| r.value().0)
202                    .collect_vec(),
203            )
204            .finish()
205    }
206}
207
208impl Default for TableTypeRegistry {
209    fn default() -> Self {
210        let new = Self {
211            listing_table_accessors: DashMap::new(),
212            materialized_accessors: DashMap::new(),
213            decorator_accessors: DashMap::new(),
214        };
215        new.register_listing_table::<ListingTable>();
216
217        new
218    }
219}
220
221impl TableTypeRegistry {
222    fn register_listing_table<T: ListingTableLike>(&self) {
223        self.listing_table_accessors.insert(
224            TypeId::of::<T>(),
225            (
226                type_name::<T>(),
227                Arc::new(|any| any.downcast_ref::<T>().map(|t| t as &dyn ListingTableLike)),
228            ),
229        );
230    }
231
232    fn register_materialized<T: Materialized>(&self) {
233        self.materialized_accessors.insert(
234            TypeId::of::<T>(),
235            (
236                type_name::<T>(),
237                Arc::new(|any| any.downcast_ref::<T>().map(|t| t as &dyn Materialized)),
238            ),
239        );
240
241        self.register_listing_table::<T>();
242    }
243
244    fn register_decorator<T: Decorator>(&self) {
245        self.decorator_accessors.insert(
246            TypeId::of::<T>(),
247            (
248                type_name::<T>(),
249                Arc::new(|any| any.downcast_ref::<T>().map(|t| t as &dyn Decorator)),
250            ),
251        );
252    }
253
254    fn cast_to_listing_table<'a>(
255        &'a self,
256        table: &'a dyn TableProvider,
257    ) -> Option<&'a dyn ListingTableLike> {
258        self.listing_table_accessors
259            .get(&table.as_any().type_id())
260            .and_then(|r| r.value().1(table.as_any()))
261    }
262
263    fn cast_to_materialized<'a>(
264        &'a self,
265        table: &'a dyn TableProvider,
266    ) -> Option<&'a dyn Materialized> {
267        self.materialized_accessors
268            .get(&table.as_any().type_id())
269            .and_then(|r| r.value().1(table.as_any()))
270    }
271
272    fn cast_to_decorator<'a>(&'a self, table: &'a dyn TableProvider) -> Option<&'a dyn Decorator> {
273        self.decorator_accessors
274            .get(&table.as_any().type_id())
275            .and_then(|r| r.value().1(table.as_any()))
276    }
277}