datafusion_federation/sql/
executor.rs1use async_trait::async_trait;
2use core::fmt;
3use datafusion::{
4 arrow::datatypes::SchemaRef,
5 common::Statistics,
6 error::Result,
7 logical_expr::LogicalPlan,
8 physical_plan::{metrics::MetricsSet, PhysicalExpr, SendableRecordBatchStream},
9 sql::{sqlparser::ast, unparser::dialect::Dialect},
10};
11use std::sync::Arc;
12
13pub type SQLExecutorRef = Arc<dyn SQLExecutor>;
14pub type AstAnalyzer = Box<dyn FnMut(ast::Statement) -> Result<ast::Statement>>;
15pub type LogicalOptimizer = Box<dyn FnMut(LogicalPlan) -> Result<LogicalPlan>>;
16pub type SqlQueryRewriter = Box<dyn FnMut(String) -> Result<String>>;
17
18#[async_trait]
19pub trait SQLExecutor: Sync + Send {
20 fn name(&self) -> &str;
22
23 fn compute_context(&self) -> Option<String>;
31
32 fn dialect(&self) -> Arc<dyn Dialect>;
34
35 fn logical_optimizer(&self) -> Option<LogicalOptimizer> {
37 None
38 }
39
40 fn ast_analyzer(&self) -> Option<AstAnalyzer> {
42 None
43 }
44
45 fn execute(
52 &self,
53 query: &str,
54 schema: SchemaRef,
55 filters: &[Arc<dyn PhysicalExpr>],
56 ) -> Result<SendableRecordBatchStream>;
57
58 async fn statistics(&self, plan: &LogicalPlan) -> Result<Statistics> {
62 Ok(Statistics::new_unknown(plan.schema().as_arrow()))
63 }
64
65 async fn table_names(&self) -> Result<Vec<String>>;
67
68 async fn get_table_schema(&self, table_name: &str) -> Result<SchemaRef>;
70
71 fn metrics(&self) -> Option<MetricsSet> {
73 None
74 }
75}
76
77impl fmt::Debug for dyn SQLExecutor {
78 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
79 write!(f, "{} {:?}", self.name(), self.compute_context())
80 }
81}
82
83impl fmt::Display for dyn SQLExecutor {
84 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
85 write!(f, "{} {:?}", self.name(), self.compute_context())
86 }
87}