datafusion_kql/
session.rs1use datafusion::arrow::datatypes::DataType;
2use datafusion::config::ConfigOptions;
3use datafusion::dataframe::DataFrame;
4use datafusion::datasource::DefaultTableSource;
5use datafusion::execution::SessionState;
6use datafusion::execution::context::SessionContext;
7
8use datafusion_common::{not_impl_err, plan_datafusion_err, DataFusionError, ResolvedTableReference, Result, TableReference};
9
10use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource, WindowUDF};
11use datafusion_expr::planner::ContextProvider;
12use datafusion_expr::registry::FunctionRegistry;
13
14use kqlparser::ast::Statement;
15use kqlparser::parser::parse;
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use crate::planner::KqlToRel;
21
22#[allow(async_fn_in_trait)]
23pub trait SessionContextExt {
24 async fn kql(&self, sql: &str) -> Result<DataFrame>;
25}
26
27#[allow(async_fn_in_trait)]
28pub trait SessionStateExt {
29 async fn create_logical_plan_kql(&self, kql: &str) -> Result<LogicalPlan>;
30 fn kql_to_statement(&self, kql: &str) -> Result<Statement>;
31 async fn kql_statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan>;
32}
33
34impl SessionContextExt for SessionContext {
35 async fn kql(&self, kql: &str) -> Result<DataFrame> {
36 let plan = self.state().create_logical_plan_kql(kql).await?;
37 self.execute_logical_plan(plan).await
38 }
39}
40
41impl SessionStateExt for SessionState {
42 async fn create_logical_plan_kql(&self, kql: &str) -> Result<LogicalPlan> {
43 let statement = self.kql_to_statement(kql)?;
45 let plan = self.kql_statement_to_plan(statement).await?;
46 Ok(plan)
47 }
48
49 fn kql_to_statement(&self, kql: &str) -> Result<Statement> {
50 let mut statements = parse(kql).unwrap().1;
51 if statements.len() > 1 {
52 return not_impl_err!(
53 "The context currently only supports a single KQL statement"
54 )
55 }
56 Ok(statements.pop().ok_or_else(|| {
57 plan_datafusion_err!("No KQL statements were provided in the query string")
58 })?)
59 }
60
61 async fn kql_statement_to_plan(&self, statement: Statement) -> Result<LogicalPlan> {
62 let mut provider = SessionContextProvider {
63 state: self,
64 tables: HashMap::with_capacity(10),
65 };
66
67 let catalog_list = self.catalog_list();
68 for catalog in catalog_list.catalog_names() {
69 let schema_list = catalog_list.catalog(&catalog).ok_or_else(|| DataFusionError::Plan(format!("Catalog '{catalog}' not found")))?;
70 for schema in schema_list.schema_names() {
71 let table_list = schema_list.schema(&schema).ok_or_else(|| DataFusionError::Plan(format!("Schema '{schema}' not found")))?;
72 for table in table_list.table_names() {
73 let resolved_ref = ResolvedTableReference {
74 catalog: Arc::from(Box::from(catalog.clone())),
75 schema: Arc::from(Box::from(schema.clone())),
76 table: Arc::from(Box::from(table.clone()))
77 };
78 let table_provider = table_list.table(&table).await?.ok_or_else(|| DataFusionError::Plan(format!("Table '{table}' not found")))?;
79 provider.tables.insert(resolved_ref.to_string(), Arc::new(DefaultTableSource::new(table_provider)));
80 }
82 }
83 }
84
85 let kql = KqlToRel::new(&provider);
86 match statement {
87 Statement::TabularExpression(query) => kql.query_to_plan(&query),
88 _ => not_impl_err!("Statement type not supported")
89 }
90 }
91}
92
93struct SessionContextProvider<'a> {
94 state: &'a SessionState,
95 tables: HashMap<String, Arc<dyn TableSource>>,
96}
97
98impl<'a> ContextProvider for SessionContextProvider<'a> {
99 fn get_table_source(&self, name: TableReference) -> Result<Arc<dyn TableSource>> {
100 let catalog = &self.state.config_options().catalog;
101 let name = name.resolve(&catalog.default_catalog, &catalog.default_schema).to_string();
102 self.tables.get(&name).cloned().ok_or_else(|| DataFusionError::Plan(format!("Table '{}' not found", name)))
103 }
104
105 fn get_function_meta(&self, name: &str) -> Option<Arc<ScalarUDF>> {
106 self.state.udf(name).ok()
107 }
108
109 fn get_aggregate_meta(&self, name: &str) -> Option<Arc<AggregateUDF>> {
110 self.state.udaf(name).ok()
111 }
112
113 fn get_variable_type(&self, _variable_names: &[String]) -> Option<DataType> {
114 None
115 }
116
117 fn get_window_meta(&self, name: &str) -> Option<Arc<WindowUDF>> {
118 self.state.udwf(name).ok()
119 }
120
121 fn options(&self) -> &ConfigOptions {
122 !unimplemented!()
123 }
124
125 fn udf_names(&self) -> Vec<String> {
126 self.state.scalar_functions().keys().cloned().collect()
127 }
128
129 fn udaf_names(&self) -> Vec<String> {
130 self.state.aggregate_functions().keys().cloned().collect()
131 }
132
133 fn udwf_names(&self) -> Vec<String> {
134 self.state.window_functions().keys().cloned().collect()
135 }
136}