datafusion_kql/
session.rs

1use datafusion::arrow::datatypes::DataType;
2use datafusion::config::ConfigOptions;
3use datafusion::dataframe::DataFrame;
4use datafusion::datasource::DefaultTableSource;
5use datafusion::execution::SessionState;
6use datafusion::execution::context::SessionContext;
7
8use datafusion_common::{not_impl_err, plan_datafusion_err, DataFusionError, ResolvedTableReference, Result, TableReference};
9
10use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource, WindowUDF};
11use datafusion_expr::planner::ContextProvider;
12use datafusion_expr::registry::FunctionRegistry;
13
14use kqlparser::ast::Statement;
15use kqlparser::parser::parse;
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use crate::planner::KqlToRel;
21
22#[allow(async_fn_in_trait)]
23pub trait SessionContextExt {
24    async fn kql(&self, sql: &str) -> Result<DataFrame>;
25}
26
27#[allow(async_fn_in_trait)]
28pub trait SessionStateExt {
29    async fn create_logical_plan_kql(&self, kql: &str) -> Result<LogicalPlan>;
30    fn kql_to_statement(&self, kql: &str) -> Result<Statement>;
31    async fn kql_statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan>;
32}
33
34impl SessionContextExt for SessionContext {
35    async fn kql(&self, kql: &str) -> Result<DataFrame> {
36        let plan = self.state().create_logical_plan_kql(kql).await?;
37        self.execute_logical_plan(plan).await
38    }
39}
40
41impl SessionStateExt for SessionState {
42    async fn create_logical_plan_kql(&self, kql: &str) -> Result<LogicalPlan> {
43        //let dialect = self.config.options().sql_parser.dialect.as_str();
44        let statement = self.kql_to_statement(kql)?;
45        let plan = self.kql_statement_to_plan(statement).await?;
46        Ok(plan)
47    }
48    
49    fn kql_to_statement(&self, kql: &str) -> Result<Statement> {
50        let mut statements = parse(kql).unwrap().1;
51        if statements.len() > 1 {
52            return not_impl_err!(
53                "The context currently only supports a single KQL statement"
54            )
55        }
56        Ok(statements.pop().ok_or_else(|| {
57            plan_datafusion_err!("No KQL statements were provided in the query string")
58        })?)
59    }
60    
61    async fn kql_statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
62        let mut provider = SessionContextProvider {
63            state: self,
64            tables: HashMap::with_capacity(10),
65        };
66
67        let catalog_list = self.catalog_list();
68        for catalog in catalog_list.catalog_names() {
69            let schema_list = catalog_list.catalog(&catalog).ok_or_else(|| DataFusionError::Plan(format!("Catalog '{catalog}' not found")))?;
70            for schema in schema_list.schema_names() {
71                let table_list = schema_list.schema(&schema).ok_or_else(|| DataFusionError::Plan(format!("Schema '{schema}' not found")))?;
72                for table in table_list.table_names() {
73                    let resolved_ref = ResolvedTableReference {
74                        catalog: Arc::from(Box::from(catalog.clone())),
75                        schema: Arc::from(Box::from(schema.clone())),
76                        table: Arc::from(Box::from(table.clone()))
77                    };
78                    let table_provider = table_list.table(&table).await?.ok_or_else(|| DataFusionError::Plan(format!("Table '{table}' not found")))?;
79                    provider.tables.insert(resolved_ref.to_string(), Arc::new(DefaultTableSource::new(table_provider)));
80                    //println!("Table: {}", resolved_ref.to_string());
81                }
82            }
83        }
84
85        let kql = KqlToRel::new(&provider);
86        match statement {
87            Statement::TabularExpression(query) => kql.query_to_plan(&query),
88            _ => not_impl_err!("Statement type not supported")
89        }
90    }
91}
92
93struct SessionContextProvider<'a> {
94    state: &'a SessionState,
95    tables: HashMap<String, Arc<dyn TableSource>>,
96}
97
98impl<'a> ContextProvider for SessionContextProvider<'a> {
99    fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
100        let catalog = &self.state.config_options().catalog;
101        let name = name.resolve(&catalog.default_catalog, &catalog.default_schema).to_string();
102        self.tables.get(&name).cloned().ok_or_else(|| DataFusionError::Plan(format!("Table '{}' not found", name)))
103    }
104
105    fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
106        self.state.udf(name).ok()
107    }
108
109    fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
110        self.state.udaf(name).ok()
111    }
112
113    fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
114        None
115    }
116    
117    fn get_window_meta(&self, name: &str) -> Option<Arc<WindowUDF>> {
118        self.state.udwf(name).ok()
119    }
120    
121    fn options(&self) -> &ConfigOptions {
122        !unimplemented!()
123    }
124    
125    fn udf_names(&self) -> Vec<String> {
126        self.state.scalar_functions().keys().cloned().collect()
127    }
128    
129    fn udaf_names(&self) -> Vec<String> {
130        self.state.aggregate_functions().keys().cloned().collect()
131    }
132    
133    fn udwf_names(&self) -> Vec<String> {
134        self.state.window_functions().keys().cloned().collect()
135    }
136}