datafusion_distributed/coordinator/
distributed.rs1use crate::common::{require_one_child, serialize_uuid};
2use crate::coordinator::metrics_store::MetricsStore;
3use crate::coordinator::prepare_static_plan::prepare_static_plan;
4use crate::distributed_planner::NetworkBoundaryExt;
5use crate::worker::generated::worker::TaskKey;
6use datafusion::common::internal_datafusion_err;
7use datafusion::common::runtime::JoinSet;
8use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
9use datafusion::common::{Result, exec_err};
10use datafusion::execution::{SendableRecordBatchStream, TaskContext};
11use datafusion::physical_expr_common::metrics::MetricsSet;
12use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
13use datafusion::physical_plan::stream::RecordBatchReceiverStreamBuilder;
14use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
15use futures::StreamExt;
16use std::fmt::Formatter;
17use std::sync::Arc;
18use std::sync::Mutex;
19
20#[derive(Debug)]
28pub struct DistributedExec {
29 plan: Arc<dyn ExecutionPlan>,
30 prepared_plan: Arc<Mutex<Option<Arc<dyn ExecutionPlan>>>>,
31 metrics: ExecutionPlanMetricsSet,
32 pub(crate) metrics_store: Option<Arc<MetricsStore>>,
33}
34
35pub(super) struct PreparedPlan {
36 pub(super) head_stage: Arc<dyn ExecutionPlan>,
37 pub(super) join_set: JoinSet<Result<()>>,
38}
39
40impl DistributedExec {
41 pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
42 Self {
43 plan,
44 prepared_plan: Arc::new(Mutex::new(None)),
45 metrics: ExecutionPlanMetricsSet::new(),
46 metrics_store: None,
47 }
48 }
49
50 pub fn with_metrics_collection(mut self, enabled: bool) -> Self {
52 self.metrics_store = match enabled {
53 true => Some(Arc::new(MetricsStore::new())),
54 false => None,
55 };
56 self
57 }
58
59 pub async fn wait_for_metrics(&self) {
67 let mut expected_keys: Vec<TaskKey> = Vec::new();
68 let Some(task_metrics) = &self.metrics_store else {
69 return;
70 };
71 let _ = self.plan.apply(|plan| {
72 if let Some(boundary) = plan.as_network_boundary() {
73 let stage = boundary.input_stage();
74 for i in 0..stage.task_count() {
75 expected_keys.push(TaskKey {
76 query_id: serialize_uuid(&stage.query_id()),
77 stage_id: stage.num() as u64,
78 task_number: i as u64,
79 });
80 }
81 }
82 Ok(TreeNodeRecursion::Continue)
83 });
84 if expected_keys.is_empty() {
85 return;
86 }
87 let mut rx = task_metrics.rx.clone();
88 let _ = rx
89 .wait_for(|map| expected_keys.iter().all(|key| map.contains_key(key)))
90 .await;
91 }
92
93 pub(crate) fn prepared_plan(&self) -> Result<Arc<dyn ExecutionPlan>> {
97 self.prepared_plan
98 .lock()
99 .map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {}", e))?
100 .clone()
101 .ok_or_else(|| {
102 internal_datafusion_err!("No prepared plan found. Was execute() called?")
103 })
104 }
105}
106
107impl DisplayAs for DistributedExec {
108 fn fmt_as(&self, _: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
109 write!(f, "DistributedExec")
110 }
111}
112
113impl ExecutionPlan for DistributedExec {
114 fn name(&self) -> &str {
115 "DistributedExec"
116 }
117
118 fn properties(&self) -> &Arc<PlanProperties> {
119 self.plan.properties()
120 }
121
122 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
123 vec![&self.plan]
124 }
125
126 fn with_new_children(
127 self: Arc<Self>,
128 children: Vec<Arc<dyn ExecutionPlan>>,
129 ) -> Result<Arc<dyn ExecutionPlan>> {
130 Ok(Arc::new(DistributedExec {
131 plan: require_one_child(&children)?,
132 prepared_plan: self.prepared_plan.clone(),
133 metrics: self.metrics.clone(),
134 metrics_store: self.metrics_store.clone(),
135 }))
136 }
137
138 fn execute(
139 &self,
140 partition: usize,
141 context: Arc<TaskContext>,
142 ) -> Result<SendableRecordBatchStream> {
143 if partition > 0 {
144 return exec_err!(
149 "DistributedExec must only have 1 partition, but it was called with partition index {partition}"
150 );
151 }
152
153 let PreparedPlan {
154 head_stage,
155 join_set,
156 } = prepare_static_plan(&self.plan, &self.metrics, &self.metrics_store, &context)?;
157 {
158 let mut guard = self
159 .prepared_plan
160 .lock()
161 .map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {e}"))?;
162 *guard = Some(head_stage.clone());
163 }
164 let mut builder = RecordBatchReceiverStreamBuilder::new(self.schema(), 1);
165 let tx = builder.tx();
166 builder.spawn(async move {
168 let mut stream = head_stage.execute(partition, context)?;
169 while let Some(msg) = stream.next().await {
170 if tx.send(msg).await.is_err() {
171 break; }
173 }
174 Ok(())
175 });
176 builder.spawn(async move {
178 for res in join_set.join_all().await {
179 res?;
180 }
181 Ok(())
182 });
183 Ok(builder.build())
184 }
185
186 fn metrics(&self) -> Option<MetricsSet> {
187 Some(self.metrics.clone_inner())
188 }
189}