datafusion_materialized_views/
materialized.rs1pub mod dependencies;
19
20pub mod row_metadata;
22
23pub mod file_metadata;
25
26mod hive_partition;
28
29mod util;
31
32use std::{
33 any::{type_name, Any, TypeId},
34 fmt::Debug,
35 sync::{Arc, LazyLock},
36};
37
38use dashmap::DashMap;
39use datafusion::{
40 catalog::TableProvider,
41 datasource::listing::{ListingTable, ListingTableUrl},
42};
43use datafusion_common::DataFusionError;
44use datafusion_expr::LogicalPlan;
45use itertools::Itertools;
46
47use crate::MaterializedConfig;
48
49pub const META_COLUMN: &str = "__meta";
51
52static TABLE_TYPE_REGISTRY: LazyLock<TableTypeRegistry> = LazyLock::new(TableTypeRegistry::default);
53
54pub trait ListingTableLike: TableProvider + 'static {
56 fn table_paths(&self) -> Vec<ListingTableUrl>;
58
59 fn partition_columns(&self) -> Vec<String>;
61
62 fn file_ext(&self) -> String;
64}
65
66impl ListingTableLike for ListingTable {
67 fn table_paths(&self) -> Vec<ListingTableUrl> {
68 self.table_paths().clone()
69 }
70
71 fn partition_columns(&self) -> Vec<String> {
72 self.options()
73 .table_partition_cols
74 .iter()
75 .map(|(name, _data_type)| name.clone())
76 .collect_vec()
77 }
78
79 fn file_ext(&self) -> String {
80 self.options().file_extension.clone()
81 }
82}
83
84pub fn register_listing_table<T: ListingTableLike>() {
88 TABLE_TYPE_REGISTRY.register_listing_table::<T>();
89}
90
91pub fn cast_to_listing_table(table: &dyn TableProvider) -> Option<&dyn ListingTableLike> {
94 TABLE_TYPE_REGISTRY
95 .cast_to_listing_table(table)
96 .or_else(|| {
97 TABLE_TYPE_REGISTRY
98 .cast_to_decorator(table)
99 .and_then(|decorator| cast_to_listing_table(decorator.base()))
100 })
101}
102
103pub trait Materialized: ListingTableLike {
105 fn query(&self) -> LogicalPlan;
107
108 fn config(&self) -> MaterializedConfig {
111 MaterializedConfig::default()
112 }
113
114 fn static_partition_columns(&self) -> Vec<String> {
119 <Self as ListingTableLike>::partition_columns(self)
120 }
121}
122
123pub fn register_materialized<T: Materialized>() {
129 TABLE_TYPE_REGISTRY.register_materialized::<T>();
130}
131
132pub fn cast_to_materialized(
138 table: &dyn TableProvider,
139) -> Result<Option<&dyn Materialized>, DataFusionError> {
140 let materialized = match TABLE_TYPE_REGISTRY
141 .cast_to_materialized(table)
142 .map(Ok)
143 .or_else(|| {
144 TABLE_TYPE_REGISTRY
145 .cast_to_decorator(table)
146 .and_then(|decorator| cast_to_materialized(decorator.base()).transpose())
147 })
148 .transpose()?
149 {
150 None => return Ok(None),
151 Some(m) => m,
152 };
153
154 let static_partition_cols = materialized.static_partition_columns();
155 let all_partition_cols = materialized.partition_columns();
156
157 if materialized.partition_columns()[..static_partition_cols.len()] != static_partition_cols[..]
158 {
159 return Err(DataFusionError::Plan(format!(
160 "Materialized view's static partition columns ({static_partition_cols:?}) must be a prefix of all partition columns ({all_partition_cols:?})"
161 )));
162 }
163
164 Ok(Some(materialized))
165}
166
167pub trait Decorator: TableProvider + 'static {
171 fn base(&self) -> &dyn TableProvider;
173}
174
175pub fn register_decorator<T: Decorator>() {
177 TABLE_TYPE_REGISTRY.register_decorator::<T>()
178}
179
180type Downcaster<T> = Arc<dyn Fn(&dyn Any) -> Option<&T> + Send + Sync>;
181
182struct TableTypeRegistry {
188 listing_table_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn ListingTableLike>)>,
189 materialized_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn Materialized>)>,
190 decorator_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn Decorator>)>,
191}
192
193impl Debug for TableTypeRegistry {
194 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
195 f.debug_struct("TableTypeRegistry")
196 .field(
197 "listing_table_accessors",
198 &self
199 .listing_table_accessors
200 .iter()
201 .map(|r| r.value().0)
202 .collect_vec(),
203 )
204 .finish()
205 }
206}
207
208impl Default for TableTypeRegistry {
209 fn default() -> Self {
210 let new = Self {
211 listing_table_accessors: DashMap::new(),
212 materialized_accessors: DashMap::new(),
213 decorator_accessors: DashMap::new(),
214 };
215 new.register_listing_table::<ListingTable>();
216
217 new
218 }
219}
220
221impl TableTypeRegistry {
222 fn register_listing_table<T: ListingTableLike>(&self) {
223 self.listing_table_accessors.insert(
224 TypeId::of::<T>(),
225 (
226 type_name::<T>(),
227 Arc::new(|any| any.downcast_ref::<T>().map(|t| t as &dyn ListingTableLike)),
228 ),
229 );
230 }
231
232 fn register_materialized<T: Materialized>(&self) {
233 self.materialized_accessors.insert(
234 TypeId::of::<T>(),
235 (
236 type_name::<T>(),
237 Arc::new(|any| any.downcast_ref::<T>().map(|t| t as &dyn Materialized)),
238 ),
239 );
240
241 self.register_listing_table::<T>();
242 }
243
244 fn register_decorator<T: Decorator>(&self) {
245 self.decorator_accessors.insert(
246 TypeId::of::<T>(),
247 (
248 type_name::<T>(),
249 Arc::new(|any| any.downcast_ref::<T>().map(|t| t as &dyn Decorator)),
250 ),
251 );
252 }
253
254 fn cast_to_listing_table<'a>(
255 &'a self,
256 table: &'a dyn TableProvider,
257 ) -> Option<&'a dyn ListingTableLike> {
258 self.listing_table_accessors
259 .get(&table.as_any().type_id())
260 .and_then(|r| r.value().1(table.as_any()))
261 }
262
263 fn cast_to_materialized<'a>(
264 &'a self,
265 table: &'a dyn TableProvider,
266 ) -> Option<&'a dyn Materialized> {
267 self.materialized_accessors
268 .get(&table.as_any().type_id())
269 .and_then(|r| r.value().1(table.as_any()))
270 }
271
272 fn cast_to_decorator<'a>(&'a self, table: &'a dyn TableProvider) -> Option<&'a dyn Decorator> {
273 self.decorator_accessors
274 .get(&table.as_any().type_id())
275 .and_then(|r| r.value().1(table.as_any()))
276 }
277}