datafusion_iceberg_sql/
context.rs

1use std::{collections::HashMap, sync::Arc};
2
3use arrow_schema::DataType;
4use datafusion_common::{config::ConfigOptions, DataFusionError, TableReference};
5use datafusion_execution::FunctionRegistry;
6use datafusion_expr::{
7    registry::MemoryFunctionRegistry, AggregateUDF, ScalarUDF, TableSource, WindowUDF,
8};
9use datafusion_sql::planner::ContextProvider;
10use iceberg_rust::catalog::{identifier::Identifier, CatalogList};
11
12use crate::IcebergTableSource;
13
14pub struct IcebergContext {
15    sources: HashMap<String, Arc<dyn TableSource>>,
16    config_options: ConfigOptions,
17    function_registry: MemoryFunctionRegistry,
18}
19
20impl IcebergContext {
21    pub async fn new(
22        tables: &[(String, String, String)],
23        catalogs: Arc<dyn CatalogList>,
24        branch: Option<&str>,
25    ) -> Result<IcebergContext, DataFusionError> {
26        let mut sources = HashMap::new();
27
28        for (catalog_name, namespace, name) in tables {
29            let catalog = catalogs
30                .catalog(catalog_name)
31                .ok_or(DataFusionError::Internal(format!(
32                    "Catalog {} was not provided",
33                    &catalog_name
34                )))?;
35
36            let tabular = catalog
37                .clone()
38                .load_tabular(
39                    &Identifier::try_new(&[namespace.to_owned(), name.to_owned()], None)
40                        .map_err(|err| DataFusionError::External(Box::new(err)))?,
41                )
42                .await
43                .map_err(|err| DataFusionError::External(Box::new(err)))?;
44
45            let table_source = IcebergTableSource::new(tabular, branch);
46
47            sources.insert(
48                catalog_name.to_owned() + "." + namespace + "." + name,
49                Arc::new(table_source) as Arc<dyn TableSource>,
50            );
51        }
52
53        let config_options = ConfigOptions::default();
54
55        let mut function_registry = MemoryFunctionRegistry::new();
56
57        datafusion_functions::register_all(&mut function_registry)?;
58        datafusion_functions_aggregate::register_all(&mut function_registry)?;
59
60        Ok(IcebergContext {
61            sources,
62            config_options,
63            function_registry,
64        })
65    }
66}
67
68impl ContextProvider for IcebergContext {
69    fn get_table_source(
70        &self,
71        name: TableReference,
72    ) -> Result<Arc<dyn TableSource>, DataFusionError> {
73        match name {
74            TableReference::Full {
75                catalog,
76                schema,
77                table,
78            } => self
79                .sources
80                .get(&(catalog.to_string() + "." + &schema + "." + &table))
81                .cloned()
82                .ok_or(DataFusionError::Internal(format!(
83                    "Couldn't resolve table reference {}.{}",
84                    &schema, &table
85                ))),
86            _ => Err(DataFusionError::Internal(
87                "Only partial table refence supported".to_string(),
88            )),
89        }
90    }
91    fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
92        self.function_registry.udf(name).ok()
93    }
94    fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
95        None
96    }
97    fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
98        self.function_registry.udaf(name).ok()
99    }
100    fn get_window_meta(&self, name: &str) -> Option<Arc<WindowUDF>> {
101        self.function_registry.udwf(name).ok()
102    }
103    fn options(&self) -> &ConfigOptions {
104        &self.config_options
105    }
106
107    fn udf_names(&self) -> Vec<String> {
108        self.function_registry.udfs().into_iter().collect()
109    }
110
111    fn udaf_names(&self) -> Vec<String> {
112        Vec::new()
113    }
114
115    fn udwf_names(&self) -> Vec<String> {
116        Vec::new()
117    }
118}