datafusion_federation/
lib.rs

1mod optimizer;
2mod plan_node;
3pub mod schema_cast;
4#[cfg(feature = "sql")]
5pub mod sql;
6mod table_provider;
7
8use std::{
9    fmt,
10    hash::{Hash, Hasher},
11    sync::Arc,
12};
13
14use datafusion::{
15    execution::session_state::{SessionState, SessionStateBuilder},
16    optimizer::{optimizer::Optimizer, OptimizerRule},
17};
18
19pub use optimizer::{get_table_source, FederationOptimizerRule};
20pub use plan_node::{
21    FederatedPlanNode, FederatedPlanner, FederatedQueryPlanner, FederationPlanner,
22};
23pub use table_provider::{FederatedTableProviderAdaptor, FederatedTableSource};
24
25pub fn default_session_state() -> SessionState {
26    let rules = default_optimizer_rules();
27    SessionStateBuilder::new()
28        .with_optimizer_rules(rules)
29        .with_query_planner(Arc::new(FederatedQueryPlanner::new()))
30        .with_default_features()
31        .build()
32}
33
34pub fn default_optimizer_rules() -> Vec<Arc<dyn OptimizerRule + Send + Sync>> {
35    // Get the default optimizer
36    let df_default = Optimizer::new();
37    let mut default_rules = df_default.rules;
38
39    // Insert the FederationOptimizerRule after the ScalarSubqueryToJoin.
40    // This ensures ScalarSubquery are replaced before we try to federate.
41    let Some(pos) = default_rules
42        .iter()
43        .position(|x| x.name() == "scalar_subquery_to_join")
44    else {
45        panic!("Could not locate ScalarSubqueryToJoin");
46    };
47
48    // TODO: check if we should allow other optimizers to run before the federation rule.
49
50    let federation_rule = Arc::new(FederationOptimizerRule::new());
51    default_rules.insert(pos + 1, federation_rule);
52
53    default_rules
54}
55
56pub type FederationProviderRef = Arc<dyn FederationProvider>;
57pub trait FederationProvider: Send + Sync {
58    // Returns the name of the provider, used for comparison.
59    fn name(&self) -> &str;
60
61    // Returns the compute context in which this federation provider
62    // will execute a query. For example: database instance & catalog.
63    fn compute_context(&self) -> Option<String>;
64
65    // Returns an optimizer that can cut out part of the plan
66    // to federate it.
67    fn optimizer(&self) -> Option<Arc<Optimizer>>;
68}
69
70impl fmt::Display for dyn FederationProvider {
71    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
72        write!(f, "{} {:?}", self.name(), self.compute_context())
73    }
74}
75
76impl PartialEq<dyn FederationProvider> for dyn FederationProvider {
77    /// Comparing name, args and return_type
78    fn eq(&self, other: &dyn FederationProvider) -> bool {
79        self.name() == other.name() && self.compute_context() == other.compute_context()
80    }
81}
82
83impl Hash for dyn FederationProvider {
84    fn hash<H: Hasher>(&self, state: &mut H) {
85        self.name().hash(state);
86        self.compute_context().hash(state);
87    }
88}
89
90impl Eq for dyn FederationProvider {}