datafusion_federation/
plan_node.rs

1use core::fmt;
2use std::{
3    fmt::Debug,
4    hash::{Hash, Hasher},
5    sync::Arc,
6};
7
8use async_trait::async_trait;
9use datafusion::{
10    common::DFSchemaRef,
11    error::{DataFusionError, Result},
12    execution::context::{QueryPlanner, SessionState},
13    logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNode, UserDefinedLogicalNodeCore},
14    physical_plan::ExecutionPlan,
15    physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner},
16};
17
18pub struct FederatedPlanNode {
19    pub plan: LogicalPlan,
20    pub planner: Arc<dyn FederationPlanner>,
21}
22
23impl FederatedPlanNode {
24    pub fn new(plan: LogicalPlan, planner: Arc<dyn FederationPlanner>) -> Self {
25        Self { plan, planner }
26    }
27
28    pub fn plan(&self) -> &LogicalPlan {
29        &self.plan
30    }
31}
32
33impl Debug for FederatedPlanNode {
34    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
35        UserDefinedLogicalNodeCore::fmt_for_explain(self, f)
36    }
37}
38
39impl UserDefinedLogicalNodeCore for FederatedPlanNode {
40    fn name(&self) -> &str {
41        "Federated"
42    }
43
44    fn inputs(&self) -> Vec<&LogicalPlan> {
45        Vec::new()
46    }
47
48    fn schema(&self) -> &DFSchemaRef {
49        self.plan.schema()
50    }
51
52    fn expressions(&self) -> Vec<Expr> {
53        Vec::new()
54    }
55
56    fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result {
57        write!(f, "Federated\n {}", self.plan)
58    }
59
60    fn with_exprs_and_inputs(&self, exprs: Vec<Expr>, inputs: Vec<LogicalPlan>) -> Result<Self> {
61        if !inputs.is_empty() {
62            return Err(DataFusionError::Plan("input size inconsistent".into()));
63        }
64        if !exprs.is_empty() {
65            return Err(DataFusionError::Plan("expression size inconsistent".into()));
66        }
67
68        Ok(Self {
69            plan: self.plan.clone(),
70            planner: self.planner.clone(),
71        })
72    }
73}
74
75#[derive(Default, Debug)]
76pub struct FederatedQueryPlanner {}
77
78impl FederatedQueryPlanner {
79    pub fn new() -> Self {
80        Self::default()
81    }
82}
83
84#[async_trait]
85impl QueryPlanner for FederatedQueryPlanner {
86    async fn create_physical_plan(
87        &self,
88        logical_plan: &LogicalPlan,
89        session_state: &SessionState,
90    ) -> Result<Arc<dyn ExecutionPlan>> {
91        // Get provider here?
92
93        let physical_planner =
94            DefaultPhysicalPlanner::with_extension_planners(vec![
95                Arc::new(FederatedPlanner::new()),
96            ]);
97        physical_planner
98            .create_physical_plan(logical_plan, session_state)
99            .await
100    }
101}
102
103#[async_trait]
104pub trait FederationPlanner: Send + Sync {
105    async fn plan_federation(
106        &self,
107        node: &FederatedPlanNode,
108        session_state: &SessionState,
109    ) -> Result<Arc<dyn ExecutionPlan>>;
110}
111
112impl std::fmt::Debug for dyn FederationPlanner {
113    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
114        write!(f, "FederationPlanner")
115    }
116}
117
118impl PartialEq<FederatedPlanNode> for FederatedPlanNode {
119    /// Comparing name, args and return_type
120    fn eq(&self, other: &FederatedPlanNode) -> bool {
121        self.plan == other.plan
122    }
123}
124
125impl PartialOrd<FederatedPlanNode> for FederatedPlanNode {
126    fn partial_cmp(&self, other: &FederatedPlanNode) -> Option<std::cmp::Ordering> {
127        self.plan.partial_cmp(&other.plan)
128    }
129}
130
131impl Eq for FederatedPlanNode {}
132
133impl Hash for FederatedPlanNode {
134    fn hash<H: Hasher>(&self, state: &mut H) {
135        self.plan.hash(state);
136    }
137}
138
139#[derive(Default)]
140pub struct FederatedPlanner {}
141
142impl FederatedPlanner {
143    pub fn new() -> Self {
144        Self::default()
145    }
146}
147
148#[async_trait]
149impl ExtensionPlanner for FederatedPlanner {
150    async fn plan_extension(
151        &self,
152        _planner: &dyn PhysicalPlanner,
153        node: &dyn UserDefinedLogicalNode,
154        logical_inputs: &[&LogicalPlan],
155        physical_inputs: &[Arc<dyn ExecutionPlan>],
156        session_state: &SessionState,
157    ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
158        let dc_node = node.as_any().downcast_ref::<FederatedPlanNode>();
159        if let Some(fed_node) = dc_node {
160            if !logical_inputs.is_empty() || !physical_inputs.is_empty() {
161                return Err(DataFusionError::Plan(
162                    "Inconsistent number of inputs".into(),
163                ));
164            }
165
166            let fed_planner = Arc::clone(&fed_node.planner);
167            let exec_plan = fed_planner.plan_federation(fed_node, session_state).await?;
168            return Ok(Some(exec_plan));
169        }
170        Ok(None)
171    }
172}