datafusion_materialized_views/
materialized.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18/// Track dependencies of materialized data in object storage
19pub mod dependencies;
20
21/// Pluggable metadata sources for incremental view maintenance
22pub mod row_metadata;
23
24/// A virtual table that exposes files in object storage.
25pub mod file_metadata;
26
27/// A UDF that parses Hive partition elements from object storage paths.
28mod hive_partition;
29
30/// Some private utility functions
31mod util;
32
33use std::{
34    any::{type_name, Any, TypeId},
35    fmt::Debug,
36    sync::{Arc, LazyLock},
37};
38
39use dashmap::DashMap;
40use datafusion::{
41    catalog::TableProvider,
42    datasource::listing::{ListingTable, ListingTableUrl},
43};
44use datafusion_expr::LogicalPlan;
45use itertools::Itertools;
46
47/// The identifier of the column that [`RowMetadataSource`](row_metadata::RowMetadataSource) implementations should store row metadata in.
48pub const META_COLUMN: &str = "__meta";
49
50static TABLE_TYPE_REGISTRY: LazyLock<TableTypeRegistry> = LazyLock::new(TableTypeRegistry::default);
51
52/// A [`TableProvider`] whose data is backed by Hive-partitioned files in object storage.
53pub trait ListingTableLike: TableProvider + 'static {
54    /// Object store URLs for this table
55    fn table_paths(&self) -> Vec<ListingTableUrl>;
56
57    /// Hive partition columns
58    fn partition_columns(&self) -> Vec<String>;
59
60    /// File extension used by this listing table
61    fn file_ext(&self) -> String;
62}
63
64impl ListingTableLike for ListingTable {
65    fn table_paths(&self) -> Vec<ListingTableUrl> {
66        self.table_paths().clone()
67    }
68
69    fn partition_columns(&self) -> Vec<String> {
70        self.options()
71            .table_partition_cols
72            .iter()
73            .map(|(name, _data_type)| name.clone())
74            .collect_vec()
75    }
76
77    fn file_ext(&self) -> String {
78        self.options().file_extension.clone()
79    }
80}
81
82/// Register a [`ListingTableLike`] implementation in this registry.
83/// This allows `cast_to_listing_table` to easily downcast a [`TableProvider`]
84/// into a `ListingTableLike` where possible.
85pub fn register_listing_table<T: ListingTableLike>() {
86    TABLE_TYPE_REGISTRY.register_listing_table::<T>();
87}
88
89/// Attempt to cast the given TableProvider into a [`ListingTableLike`].
90/// If the table's type has not been registered using [`register_listing_table`], will return `None`.
91pub fn cast_to_listing_table(table: &dyn TableProvider) -> Option<&dyn ListingTableLike> {
92    TABLE_TYPE_REGISTRY.cast_to_listing_table(table)
93}
94
95/// A hive-partitioned table in object storage that is defined by a user-provided query.
96pub trait Materialized: ListingTableLike {
97    /// The query that defines this materialized view.
98    fn query(&self) -> LogicalPlan;
99}
100
101/// Register a [`Materialized`] implementation in this registry.
102/// This allows `cast_to_materialized` to easily downcast a [`TableProvider`]
103/// into a `Materialized` where possible.
104///
105/// Note that this will also register `T` as a [`ListingTableLike`].
106pub fn register_materialized<T: Materialized>() {
107    TABLE_TYPE_REGISTRY.register_materialized::<T>();
108}
109
110/// Attempt to cast the given TableProvider into a [`Materialized`].
111/// If the table's type has not been registered using [`register_materialized`], will return `None`.
112pub fn cast_to_materialized(table: &dyn TableProvider) -> Option<&dyn Materialized> {
113    TABLE_TYPE_REGISTRY.cast_to_materialized(table)
114}
115
116type Downcaster<T> = Arc<dyn Fn(&dyn Any) -> Option<&T> + Send + Sync>;
117
118/// A registry for implementations of library-defined traits, used for downcasting
119/// arbitrary TableProviders into `ListingTableLike` and `Materialized` trait objects where possible.
120///
121/// This is used throughout the crate as a singleton to store all known implementations of `ListingTableLike` and `Materialized`.
122/// By default, [`ListingTable`] is registered as a `ListingTableLike`.
123struct TableTypeRegistry {
124    listing_table_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn ListingTableLike>)>,
125    materialized_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn Materialized>)>,
126}
127
128impl Debug for TableTypeRegistry {
129    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130        f.debug_struct("TableTypeRegistry")
131            .field(
132                "listing_table_accessors",
133                &self
134                    .listing_table_accessors
135                    .iter()
136                    .map(|r| r.value().0)
137                    .collect_vec(),
138            )
139            .finish()
140    }
141}
142
143impl Default for TableTypeRegistry {
144    fn default() -> Self {
145        let new = Self {
146            listing_table_accessors: DashMap::new(),
147            materialized_accessors: DashMap::new(),
148        };
149        new.register_listing_table::<ListingTable>();
150
151        new
152    }
153}
154
155impl TableTypeRegistry {
156    fn register_listing_table<T: ListingTableLike>(&self) {
157        self.listing_table_accessors.insert(
158            TypeId::of::<T>(),
159            (
160                type_name::<T>(),
161                Arc::new(|any| any.downcast_ref::<T>().map(|t| t as &dyn ListingTableLike)),
162            ),
163        );
164    }
165
166    fn register_materialized<T: Materialized>(&self) {
167        self.materialized_accessors.insert(
168            TypeId::of::<T>(),
169            (
170                type_name::<T>(),
171                Arc::new(|any| any.downcast_ref::<T>().map(|t| t as &dyn Materialized)),
172            ),
173        );
174
175        self.register_listing_table::<T>();
176    }
177
178    fn cast_to_listing_table<'a>(
179        &'a self,
180        table: &'a dyn TableProvider,
181    ) -> Option<&'a dyn ListingTableLike> {
182        self.listing_table_accessors
183            .get(&table.as_any().type_id())
184            .and_then(|r| r.value().1(table.as_any()))
185    }
186
187    fn cast_to_materialized<'a>(
188        &'a self,
189        table: &'a dyn TableProvider,
190    ) -> Option<&'a dyn Materialized> {
191        self.materialized_accessors
192            .get(&table.as_any().type_id())
193            .and_then(|r| r.value().1(table.as_any()))
194    }
195}