datafusion_federation/
plan_node.rs1use core::fmt;
2use std::{
3 fmt::Debug,
4 hash::{Hash, Hasher},
5 sync::Arc,
6};
7
8use async_trait::async_trait;
9use datafusion::{
10 common::DFSchemaRef,
11 error::{DataFusionError, Result},
12 execution::context::{QueryPlanner, SessionState},
13 logical_expr::{Expr, LogicalPlan, UserDefinedLogicalNode, UserDefinedLogicalNodeCore},
14 physical_plan::ExecutionPlan,
15 physical_planner::{DefaultPhysicalPlanner, ExtensionPlanner, PhysicalPlanner},
16};
17
18pub struct FederatedPlanNode {
19 pub plan: LogicalPlan,
20 pub planner: Arc<dyn FederationPlanner>,
21}
22
23impl FederatedPlanNode {
24 pub fn new(plan: LogicalPlan, planner: Arc<dyn FederationPlanner>) -> Self {
25 Self { plan, planner }
26 }
27
28 pub fn plan(&self) -> &LogicalPlan {
29 &self.plan
30 }
31}
32
33impl Debug for FederatedPlanNode {
34 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
35 UserDefinedLogicalNodeCore::fmt_for_explain(self, f)
36 }
37}
38
39impl UserDefinedLogicalNodeCore for FederatedPlanNode {
40 fn name(&self) -> &str {
41 "Federated"
42 }
43
44 fn inputs(&self) -> Vec<&LogicalPlan> {
45 Vec::new()
46 }
47
48 fn schema(&self) -> &DFSchemaRef {
49 self.plan.schema()
50 }
51
52 fn expressions(&self) -> Vec<Expr> {
53 Vec::new()
54 }
55
56 fn fmt_for_explain(&self, f: &mut fmt::Formatter) -> fmt::Result {
57 write!(f, "Federated\n {}", self.plan)
58 }
59
60 fn with_exprs_and_inputs(&self, exprs: Vec<Expr>, inputs: Vec<LogicalPlan>) -> Result<Self> {
61 if !inputs.is_empty() {
62 return Err(DataFusionError::Plan("input size inconsistent".into()));
63 }
64 if !exprs.is_empty() {
65 return Err(DataFusionError::Plan("expression size inconsistent".into()));
66 }
67
68 Ok(Self {
69 plan: self.plan.clone(),
70 planner: self.planner.clone(),
71 })
72 }
73}
74
75#[derive(Default, Debug)]
76pub struct FederatedQueryPlanner {}
77
78impl FederatedQueryPlanner {
79 pub fn new() -> Self {
80 Self::default()
81 }
82}
83
84#[async_trait]
85impl QueryPlanner for FederatedQueryPlanner {
86 async fn create_physical_plan(
87 &self,
88 logical_plan: &LogicalPlan,
89 session_state: &SessionState,
90 ) -> Result<Arc<dyn ExecutionPlan>> {
91 let physical_planner =
94 DefaultPhysicalPlanner::with_extension_planners(vec![
95 Arc::new(FederatedPlanner::new()),
96 ]);
97 physical_planner
98 .create_physical_plan(logical_plan, session_state)
99 .await
100 }
101}
102
103#[async_trait]
104pub trait FederationPlanner: Send + Sync {
105 async fn plan_federation(
106 &self,
107 node: &FederatedPlanNode,
108 session_state: &SessionState,
109 ) -> Result<Arc<dyn ExecutionPlan>>;
110}
111
112impl std::fmt::Debug for dyn FederationPlanner {
113 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
114 write!(f, "FederationPlanner")
115 }
116}
117
118impl PartialEq<FederatedPlanNode> for FederatedPlanNode {
119 fn eq(&self, other: &FederatedPlanNode) -> bool {
121 self.plan == other.plan
122 }
123}
124
125impl PartialOrd<FederatedPlanNode> for FederatedPlanNode {
126 fn partial_cmp(&self, other: &FederatedPlanNode) -> Option<std::cmp::Ordering> {
127 self.plan.partial_cmp(&other.plan)
128 }
129}
130
131impl Eq for FederatedPlanNode {}
132
133impl Hash for FederatedPlanNode {
134 fn hash<H: Hasher>(&self, state: &mut H) {
135 self.plan.hash(state);
136 }
137}
138
139#[derive(Default)]
140pub struct FederatedPlanner {}
141
142impl FederatedPlanner {
143 pub fn new() -> Self {
144 Self::default()
145 }
146}
147
148#[async_trait]
149impl ExtensionPlanner for FederatedPlanner {
150 async fn plan_extension(
151 &self,
152 _planner: &dyn PhysicalPlanner,
153 node: &dyn UserDefinedLogicalNode,
154 logical_inputs: &[&LogicalPlan],
155 physical_inputs: &[Arc<dyn ExecutionPlan>],
156 session_state: &SessionState,
157 ) -> Result<Option<Arc<dyn ExecutionPlan>>> {
158 let dc_node = node.as_any().downcast_ref::<FederatedPlanNode>();
159 if let Some(fed_node) = dc_node {
160 if !logical_inputs.is_empty() || !physical_inputs.is_empty() {
161 return Err(DataFusionError::Plan(
162 "Inconsistent number of inputs".into(),
163 ));
164 }
165
166 let fed_planner = Arc::clone(&fed_node.planner);
167 let exec_plan = fed_planner.plan_federation(fed_node, session_state).await?;
168 return Ok(Some(exec_plan));
169 }
170 Ok(None)
171 }
172}