datafusion_materialized_views/
materialized.rs1pub mod dependencies;
20
21pub mod row_metadata;
23
24pub mod file_metadata;
26
27mod hive_partition;
29
30mod util;
32
33use std::{
34 any::{type_name, Any, TypeId},
35 fmt::Debug,
36 sync::{Arc, LazyLock},
37};
38
39use dashmap::DashMap;
40use datafusion::{
41 catalog::TableProvider,
42 datasource::listing::{ListingTable, ListingTableUrl},
43};
44use datafusion_expr::LogicalPlan;
45use itertools::Itertools;
46
47pub const META_COLUMN: &str = "__meta";
49
50static TABLE_TYPE_REGISTRY: LazyLock<TableTypeRegistry> = LazyLock::new(TableTypeRegistry::default);
51
52pub trait ListingTableLike: TableProvider + 'static {
54 fn table_paths(&self) -> Vec<ListingTableUrl>;
56
57 fn partition_columns(&self) -> Vec<String>;
59
60 fn file_ext(&self) -> String;
62}
63
64impl ListingTableLike for ListingTable {
65 fn table_paths(&self) -> Vec<ListingTableUrl> {
66 self.table_paths().clone()
67 }
68
69 fn partition_columns(&self) -> Vec<String> {
70 self.options()
71 .table_partition_cols
72 .iter()
73 .map(|(name, _data_type)| name.clone())
74 .collect_vec()
75 }
76
77 fn file_ext(&self) -> String {
78 self.options().file_extension.clone()
79 }
80}
81
82pub fn register_listing_table<T: ListingTableLike>() {
86 TABLE_TYPE_REGISTRY.register_listing_table::<T>();
87}
88
89pub fn cast_to_listing_table(table: &dyn TableProvider) -> Option<&dyn ListingTableLike> {
92 TABLE_TYPE_REGISTRY.cast_to_listing_table(table)
93}
94
95pub trait Materialized: ListingTableLike {
97 fn query(&self) -> LogicalPlan;
99}
100
101pub fn register_materialized<T: Materialized>() {
107 TABLE_TYPE_REGISTRY.register_materialized::<T>();
108}
109
110pub fn cast_to_materialized(table: &dyn TableProvider) -> Option<&dyn Materialized> {
113 TABLE_TYPE_REGISTRY.cast_to_materialized(table)
114}
115
116type Downcaster<T> = Arc<dyn Fn(&dyn Any) -> Option<&T> + Send + Sync>;
117
118struct TableTypeRegistry {
124 listing_table_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn ListingTableLike>)>,
125 materialized_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn Materialized>)>,
126}
127
128impl Debug for TableTypeRegistry {
129 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
130 f.debug_struct("TableTypeRegistry")
131 .field(
132 "listing_table_accessors",
133 &self
134 .listing_table_accessors
135 .iter()
136 .map(|r| r.value().0)
137 .collect_vec(),
138 )
139 .finish()
140 }
141}
142
143impl Default for TableTypeRegistry {
144 fn default() -> Self {
145 let new = Self {
146 listing_table_accessors: DashMap::new(),
147 materialized_accessors: DashMap::new(),
148 };
149 new.register_listing_table::<ListingTable>();
150
151 new
152 }
153}
154
155impl TableTypeRegistry {
156 fn register_listing_table<T: ListingTableLike>(&self) {
157 self.listing_table_accessors.insert(
158 TypeId::of::<T>(),
159 (
160 type_name::<T>(),
161 Arc::new(|any| any.downcast_ref::<T>().map(|t| t as &dyn ListingTableLike)),
162 ),
163 );
164 }
165
166 fn register_materialized<T: Materialized>(&self) {
167 self.materialized_accessors.insert(
168 TypeId::of::<T>(),
169 (
170 type_name::<T>(),
171 Arc::new(|any| any.downcast_ref::<T>().map(|t| t as &dyn Materialized)),
172 ),
173 );
174
175 self.register_listing_table::<T>();
176 }
177
178 fn cast_to_listing_table<'a>(
179 &'a self,
180 table: &'a dyn TableProvider,
181 ) -> Option<&'a dyn ListingTableLike> {
182 self.listing_table_accessors
183 .get(&table.as_any().type_id())
184 .and_then(|r| r.value().1(table.as_any()))
185 }
186
187 fn cast_to_materialized<'a>(
188 &'a self,
189 table: &'a dyn TableProvider,
190 ) -> Option<&'a dyn Materialized> {
191 self.materialized_accessors
192 .get(&table.as_any().type_id())
193 .and_then(|r| r.value().1(table.as_any()))
194 }
195}