datafusion_iceberg_sql/
context.rs1use std::{collections::HashMap, sync::Arc};
2
3use arrow_schema::DataType;
4use datafusion_common::{config::ConfigOptions, DataFusionError, TableReference};
5use datafusion_execution::FunctionRegistry;
6use datafusion_expr::{
7 registry::MemoryFunctionRegistry, AggregateUDF, ScalarUDF, TableSource, WindowUDF,
8};
9use datafusion_sql::planner::ContextProvider;
10use iceberg_rust::catalog::{identifier::Identifier, CatalogList};
11
12use crate::IcebergTableSource;
13
14pub struct IcebergContext {
15 sources: HashMap<String, Arc<dyn TableSource>>,
16 config_options: ConfigOptions,
17 function_registry: MemoryFunctionRegistry,
18}
19
20impl IcebergContext {
21 pub async fn new(
22 tables: &[(String, String, String)],
23 catalogs: Arc<dyn CatalogList>,
24 branch: Option<&str>,
25 ) -> Result<IcebergContext, DataFusionError> {
26 let mut sources = HashMap::new();
27
28 for (catalog_name, namespace, name) in tables {
29 let catalog = catalogs
30 .catalog(catalog_name)
31 .ok_or(DataFusionError::Internal(format!(
32 "Catalog {} was not provided",
33 &catalog_name
34 )))?;
35
36 let tabular = catalog
37 .clone()
38 .load_tabular(
39 &Identifier::try_new(&[namespace.to_owned(), name.to_owned()], None)
40 .map_err(|err| DataFusionError::External(Box::new(err)))?,
41 )
42 .await
43 .map_err(|err| DataFusionError::External(Box::new(err)))?;
44
45 let table_source = IcebergTableSource::new(tabular, branch);
46
47 sources.insert(
48 catalog_name.to_owned() + "." + namespace + "." + name,
49 Arc::new(table_source) as Arc<dyn TableSource>,
50 );
51 }
52
53 let config_options = ConfigOptions::default();
54
55 let mut function_registry = MemoryFunctionRegistry::new();
56
57 datafusion_functions::register_all(&mut function_registry)?;
58 datafusion_functions_aggregate::register_all(&mut function_registry)?;
59
60 Ok(IcebergContext {
61 sources,
62 config_options,
63 function_registry,
64 })
65 }
66}
67
68impl ContextProvider for IcebergContext {
69 fn get_table_source(
70 &self,
71 name: TableReference,
72 ) -> Result<Arc<dyn TableSource>, DataFusionError> {
73 match name {
74 TableReference::Full {
75 catalog,
76 schema,
77 table,
78 } => self
79 .sources
80 .get(&(catalog.to_string() + "." + &schema + "." + &table))
81 .cloned()
82 .ok_or(DataFusionError::Internal(format!(
83 "Couldn't resolve table reference {}.{}",
84 &schema, &table
85 ))),
86 _ => Err(DataFusionError::Internal(
87 "Only partial table refence supported".to_string(),
88 )),
89 }
90 }
91 fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
92 self.function_registry.udf(name).ok()
93 }
94 fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
95 None
96 }
97 fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
98 self.function_registry.udaf(name).ok()
99 }
100 fn get_window_meta(&self, name: &str) -> Option<Arc<WindowUDF>> {
101 self.function_registry.udwf(name).ok()
102 }
103 fn options(&self) -> &ConfigOptions {
104 &self.config_options
105 }
106
107 fn udf_names(&self) -> Vec<String> {
108 self.function_registry.udfs().into_iter().collect()
109 }
110
111 fn udaf_names(&self) -> Vec<String> {
112 Vec::new()
113 }
114
115 fn udwf_names(&self) -> Vec<String> {
116 Vec::new()
117 }
118}