datafusion_materialized_views/materialized.rs
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
/// Track dependencies of materialized data in object storage
pub mod dependencies;
/// Pluggable metadata sources for incremental view maintenance
pub mod row_metadata;
/// A virtual table that exposes files in object storage.
pub mod file_metadata;
/// A UDF that parses Hive partition elements from object storage paths.
mod hive_partition;
/// Some private utility functions
mod util;
use std::{
any::{type_name, Any, TypeId},
fmt::Debug,
sync::{Arc, LazyLock},
};
use dashmap::DashMap;
use datafusion::{
catalog::TableProvider,
datasource::listing::{ListingTable, ListingTableUrl},
};
use datafusion_expr::LogicalPlan;
use itertools::Itertools;
/// The identifier of the column that [`RowMetadataSource`](row_metadata::RowMetadataSource) implementations should store row metadata in.
pub const META_COLUMN: &str = "__meta";
static TABLE_TYPE_REGISTRY: LazyLock<TableTypeRegistry> = LazyLock::new(TableTypeRegistry::default);
/// A [`TableProvider`] whose data is backed by Hive-partitioned files in object storage.
pub trait ListingTableLike: TableProvider + 'static {
/// Object store URLs for this table
fn table_paths(&self) -> Vec<ListingTableUrl>;
/// Hive partition columns
fn partition_columns(&self) -> Vec<String>;
/// File extension used by this listing table
fn file_ext(&self) -> String;
}
impl ListingTableLike for ListingTable {
fn table_paths(&self) -> Vec<ListingTableUrl> {
self.table_paths().clone()
}
fn partition_columns(&self) -> Vec<String> {
self.options()
.table_partition_cols
.iter()
.map(|(name, _data_type)| name.clone())
.collect_vec()
}
fn file_ext(&self) -> String {
self.options().file_extension.clone()
}
}
/// Register a [`ListingTableLike`] implementation in this registry.
/// This allows `cast_to_listing_table` to easily downcast a [`TableProvider`]
/// into a `ListingTableLike` where possible.
pub fn register_listing_table<T: ListingTableLike>() {
TABLE_TYPE_REGISTRY.register_listing_table::<T>();
}
/// Attempt to cast the given TableProvider into a [`ListingTableLike`].
/// If the table's type has not been registered using [`register_listing_table`], will return `None`.
pub fn cast_to_listing_table(table: &dyn TableProvider) -> Option<&dyn ListingTableLike> {
TABLE_TYPE_REGISTRY.cast_to_listing_table(table)
}
/// A hive-partitioned table in object storage that is defined by a user-provided query.
pub trait Materialized: ListingTableLike {
/// The query that defines this materialized view.
fn query(&self) -> LogicalPlan;
}
/// Register a [`Materialized`] implementation in this registry.
/// This allows `cast_to_materialized` to easily downcast a [`TableProvider`]
/// into a `Materialized` where possible.
///
/// Note that this will also register `T` as a [`ListingTableLike`].
pub fn register_materialized<T: Materialized>() {
TABLE_TYPE_REGISTRY.register_materialized::<T>();
}
/// Attempt to cast the given TableProvider into a [`Materialized`].
/// If the table's type has not been registered using [`register_materialized`], will return `None`.
pub fn cast_to_materialized(table: &dyn TableProvider) -> Option<&dyn Materialized> {
TABLE_TYPE_REGISTRY.cast_to_materialized(table)
}
type Downcaster<T> = Arc<dyn Fn(&dyn Any) -> Option<&T> + Send + Sync>;
/// A registry for implementations of library-defined traits, used for downcasting
/// arbitrary TableProviders into `ListingTableLike` and `Materialized` trait objects where possible.
///
/// This is used throughout the crate as a singleton to store all known implementations of `ListingTableLike` and `Materialized`.
/// By default, [`ListingTable`] is registered as a `ListingTableLike`.
struct TableTypeRegistry {
listing_table_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn ListingTableLike>)>,
materialized_accessors: DashMap<TypeId, (&'static str, Downcaster<dyn Materialized>)>,
}
impl Debug for TableTypeRegistry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("TableTypeRegistry")
.field(
"listing_table_accessors",
&self
.listing_table_accessors
.iter()
.map(|r| r.value().0)
.collect_vec(),
)
.finish()
}
}
impl Default for TableTypeRegistry {
fn default() -> Self {
let new = Self {
listing_table_accessors: DashMap::new(),
materialized_accessors: DashMap::new(),
};
new.register_listing_table::<ListingTable>();
new
}
}
impl TableTypeRegistry {
fn register_listing_table<T: ListingTableLike>(&self) {
self.listing_table_accessors.insert(
TypeId::of::<T>(),
(
type_name::<T>(),
Arc::new(|any| any.downcast_ref::<T>().map(|t| t as &dyn ListingTableLike)),
),
);
}
fn register_materialized<T: Materialized>(&self) {
self.materialized_accessors.insert(
TypeId::of::<T>(),
(
type_name::<T>(),
Arc::new(|any| any.downcast_ref::<T>().map(|t| t as &dyn Materialized)),
),
);
self.register_listing_table::<T>();
}
fn cast_to_listing_table<'a>(
&'a self,
table: &'a dyn TableProvider,
) -> Option<&'a dyn ListingTableLike> {
self.listing_table_accessors
.get(&table.as_any().type_id())
.and_then(|r| r.value().1(table.as_any()))
}
fn cast_to_materialized<'a>(
&'a self,
table: &'a dyn TableProvider,
) -> Option<&'a dyn Materialized> {
self.materialized_accessors
.get(&table.as_any().type_id())
.and_then(|r| r.value().1(table.as_any()))
}
}