datafusion_federation/sql/
executor.rs

1use async_trait::async_trait;
2use core::fmt;
3use datafusion::{
4    arrow::datatypes::SchemaRef,
5    error::Result,
6    logical_expr::LogicalPlan,
7    physical_plan::SendableRecordBatchStream,
8    sql::{sqlparser::ast, unparser::dialect::Dialect},
9};
10use std::sync::Arc;
11
12pub type SQLExecutorRef = Arc<dyn SQLExecutor>;
13pub type AstAnalyzer = Box<dyn FnMut(ast::Statement) -> Result<ast::Statement>>;
14pub type LogicalOptimizer = Box<dyn FnMut(LogicalPlan) -> Result<LogicalPlan>>;
15
16#[async_trait]
17pub trait SQLExecutor: Sync + Send {
18    /// Executor name
19    fn name(&self) -> &str;
20
21    /// Executor compute context allows differentiating the remote compute context
22    /// such as authorization or active database.
23    ///
24    /// Note: returning None here may cause incorrect federation with other providers of the
25    /// same name that also have a compute_context of None.
26    /// Instead try to return a unique string that will never match any other
27    /// provider's context.
28    fn compute_context(&self) -> Option<String>;
29
30    /// The specific SQL dialect (currently supports 'sqlite', 'postgres', 'flight')
31    fn dialect(&self) -> Arc<dyn Dialect>;
32
33    /// Returns the analyzer rule specific for this engine to modify the logical plan before execution
34    fn logical_optimizer(&self) -> Option<LogicalOptimizer> {
35        None
36    }
37
38    /// Returns an AST analyzer specific for this engine to modify the AST before execution
39    fn ast_analyzer(&self) -> Option<AstAnalyzer> {
40        None
41    }
42
43    /// Execute a SQL query
44    fn execute(&self, query: &str, schema: SchemaRef) -> Result<SendableRecordBatchStream>;
45
46    /// Returns the tables provided by the remote
47    async fn table_names(&self) -> Result<Vec<String>>;
48
49    /// Returns the schema of table_name within this [`SQLExecutor`]
50    async fn get_table_schema(&self, table_name: &str) -> Result<SchemaRef>;
51}
52
53impl fmt::Debug for dyn SQLExecutor {
54    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
55        write!(f, "{} {:?}", self.name(), self.compute_context())
56    }
57}
58
59impl fmt::Display for dyn SQLExecutor {
60    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
61        write!(f, "{} {:?}", self.name(), self.compute_context())
62    }
63}