datafusion_federation/sql/
executor.rs1use async_trait::async_trait;
2use core::fmt;
3use datafusion::{
4 arrow::datatypes::SchemaRef,
5 common::Statistics,
6 error::Result,
7 logical_expr::LogicalPlan,
8 physical_plan::{metrics::MetricsSet, SendableRecordBatchStream},
9 sql::{sqlparser::ast, unparser::dialect::Dialect},
10};
11use std::sync::Arc;
12
13pub type SQLExecutorRef = Arc<dyn SQLExecutor>;
14pub type AstAnalyzer = Box<dyn FnMut(ast::Statement) -> Result<ast::Statement>>;
15pub type LogicalOptimizer = Box<dyn FnMut(LogicalPlan) -> Result<LogicalPlan>>;
16
17#[async_trait]
18pub trait SQLExecutor: Sync + Send {
19 fn name(&self) -> &str;
21
22 fn compute_context(&self) -> Option<String>;
30
31 fn dialect(&self) -> Arc<dyn Dialect>;
33
34 fn logical_optimizer(&self) -> Option<LogicalOptimizer> {
36 None
37 }
38
39 fn ast_analyzer(&self) -> Option<AstAnalyzer> {
41 None
42 }
43
44 fn execute(&self, query: &str, schema: SchemaRef) -> Result<SendableRecordBatchStream>;
46
47 async fn statistics(&self, plan: &LogicalPlan) -> Result<Statistics> {
51 Ok(Statistics::new_unknown(plan.schema().as_arrow()))
52 }
53
54 async fn table_names(&self) -> Result<Vec<String>>;
56
57 async fn get_table_schema(&self, table_name: &str) -> Result<SchemaRef>;
59
60 fn metrics(&self) -> Option<MetricsSet> {
62 None
63 }
64}
65
66impl fmt::Debug for dyn SQLExecutor {
67 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
68 write!(f, "{} {:?}", self.name(), self.compute_context())
69 }
70}
71
72impl fmt::Display for dyn SQLExecutor {
73 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
74 write!(f, "{} {:?}", self.name(), self.compute_context())
75 }
76}