datafusion_distributed/execution_plans/
distributed.rs1use crate::common::require_one_child;
2use crate::config_extension_ext::get_config_extension_propagation_headers;
3use crate::distributed_planner::NetworkBoundaryExt;
4use crate::networking::get_distributed_worker_resolver;
5use crate::passthrough_headers::get_passthrough_headers;
6use crate::protobuf::{DistributedCodec, tonic_status_to_datafusion_error};
7use crate::stage::{ExecutionTask, Stage};
8use crate::worker::generated::worker::{
9 CoordinatorToWorkerMsg, SetPlanRequest, TaskKey, coordinator_to_worker_msg::Inner,
10};
11use crate::{
12 DISTRIBUTED_DATAFUSION_TASK_ID_LABEL, WorkerResolver, get_distributed_channel_resolver,
13};
14use datafusion::common::instant::Instant;
15use datafusion::common::runtime::JoinSet;
16use datafusion::common::tree_node::{Transformed, TreeNode};
17use datafusion::common::{Result, exec_err, internal_err};
18use datafusion::common::{exec_datafusion_err, internal_datafusion_err};
19use datafusion::error::DataFusionError;
20use datafusion::execution::{SendableRecordBatchStream, TaskContext};
21use datafusion::physical_expr_common::metrics::MetricsSet;
22use datafusion::physical_plan::metrics::{
23 ExecutionPlanMetricsSet, Label, MetricBuilder, MetricValue, Time,
24};
25use datafusion::physical_plan::stream::RecordBatchReceiverStreamBuilder;
26use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
27use datafusion_proto::physical_plan::AsExecutionPlan;
28use datafusion_proto::protobuf::PhysicalPlanNode;
29use futures::StreamExt;
30use http::Extensions;
31use prost::Message;
32use rand::Rng;
33use std::any::Any;
34use std::fmt::{Display, Formatter};
35use std::sync::Arc;
36use std::sync::Mutex;
37use std::sync::atomic::{AtomicU64, Ordering};
38use std::time::Duration;
39use tonic::Request;
40use tonic::metadata::MetadataMap;
41use url::Url;
42
43#[derive(Debug)]
50pub struct DistributedExec {
51 pub plan: Arc<dyn ExecutionPlan>,
52 pub prepared_plan: Arc<Mutex<Option<Arc<dyn ExecutionPlan>>>>,
53 metrics: ExecutionPlanMetricsSet,
54}
55
56struct PreparedPlan {
57 plan: Arc<dyn ExecutionPlan>,
58 join_set: JoinSet<Result<()>>,
59}
60
61impl DistributedExec {
62 pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
63 Self {
64 plan,
65 prepared_plan: Arc::new(Mutex::new(None)),
66 metrics: ExecutionPlanMetricsSet::new(),
67 }
68 }
69
70 pub(crate) fn prepared_plan(&self) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
73 self.prepared_plan
74 .lock()
75 .map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {}", e))?
76 .clone()
77 .ok_or_else(|| {
78 internal_datafusion_err!("No prepared plan found. Was execute() called?")
79 })
80 }
81
82 fn prepare_plan(&self, ctx: &Arc<TaskContext>) -> Result<PreparedPlan> {
90 let worker_resolver = get_distributed_worker_resolver(ctx.session_config())?;
91 let codec = DistributedCodec::new_combined_with_user(ctx.session_config());
92
93 let urls = worker_resolver.get_urls()?;
94
95 let plan_bytes_sent = MetricBuilder::new(&self.metrics)
97 .with_label(Label::new(DISTRIBUTED_DATAFUSION_TASK_ID_LABEL, "0"))
98 .global_counter("plan_bytes_sent");
99
100 let start = Instant::now();
102 let plan_send_latency = Arc::new(LatencyMetric::new(
103 "plan_send_latency",
104 |b| b.with_label(Label::new(DISTRIBUTED_DATAFUSION_TASK_ID_LABEL, "0")),
105 &self.metrics,
106 ));
107
108 let mut join_set = JoinSet::new();
109 let prepared = Arc::clone(&self.plan).transform_up(|plan| {
110 let Some(plan) = plan.as_network_boundary() else {
112 return Ok(Transformed::no(plan));
113 };
114
115 let stage = plan.input_stage();
116 let Some(input_plan) = &stage.plan else {
117 return internal_err!("Plan is not set for stage {}", stage.num);
118 };
119
120 let start_idx = rand::rng().random_range(0..urls.len());
122
123 let bytes = PhysicalPlanNode::try_from_physical_plan(Arc::clone(input_plan), &codec)?
127 .encode_to_vec();
128
129 let tasks = stage
130 .tasks
131 .iter()
132 .enumerate()
133 .map(|(i, _)| {
134 let url = urls[(start_idx + i) % urls.len()].clone();
135 let execution_task = ExecutionTask {
136 url: Some(url.clone()),
137 };
138 let request = SetPlanRequest {
139 plan_proto: bytes.clone(),
140 task_count: stage.tasks.len() as _,
141 task_key: Some(TaskKey {
142 query_id: stage.query_id.as_bytes().to_vec(),
143 stage_id: stage.num as _,
144 task_number: i as _,
145 }),
146 };
147 plan_bytes_sent.add(bytes.len());
148 let plan_send_latency = Arc::clone(&plan_send_latency);
149 let ctx = Arc::clone(ctx);
150 join_set.spawn(async move {
153 send_plan_task(ctx, url, request).await?;
154 plan_send_latency.record(&start);
155 Ok(())
156 });
157 execution_task
158 })
159 .collect::<Vec<_>>();
160
161 Ok(Transformed::yes(plan.with_input_stage(Stage {
162 query_id: stage.query_id,
163 num: stage.num,
164 plan: None,
165 tasks,
166 })?))
167 })?;
168 Ok(PreparedPlan {
169 plan: prepared.data,
170 join_set,
171 })
172 }
173}
174
175impl DisplayAs for DistributedExec {
176 fn fmt_as(&self, _: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
177 write!(f, "DistributedExec")
178 }
179}
180
181impl ExecutionPlan for DistributedExec {
182 fn name(&self) -> &str {
183 "DistributedExec"
184 }
185
186 fn as_any(&self) -> &dyn Any {
187 self
188 }
189
190 fn properties(&self) -> &Arc<PlanProperties> {
191 self.plan.properties()
192 }
193
194 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
195 vec![&self.plan]
196 }
197
198 fn with_new_children(
199 self: Arc<Self>,
200 children: Vec<Arc<dyn ExecutionPlan>>,
201 ) -> Result<Arc<dyn ExecutionPlan>> {
202 Ok(Arc::new(DistributedExec {
203 plan: require_one_child(&children)?,
204 prepared_plan: self.prepared_plan.clone(),
205 metrics: self.metrics.clone(),
206 }))
207 }
208
209 fn execute(
210 &self,
211 partition: usize,
212 context: Arc<TaskContext>,
213 ) -> Result<SendableRecordBatchStream> {
214 if partition > 0 {
215 return exec_err!(
220 "DistributedExec must only have 1 partition, but it was called with partition index {partition}"
221 );
222 }
223
224 let PreparedPlan { plan, join_set } = self.prepare_plan(&context)?;
225 {
226 let mut guard = self
227 .prepared_plan
228 .lock()
229 .map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {e}"))?;
230 *guard = Some(plan.clone());
231 }
232 let mut builder = RecordBatchReceiverStreamBuilder::new(self.schema(), 1);
233 let tx = builder.tx();
234 builder.spawn(async move {
236 let mut stream = plan.execute(partition, context)?;
237 while let Some(msg) = stream.next().await {
238 if tx.send(msg).await.is_err() {
239 break; }
241 }
242 Ok(())
243 });
244 builder.spawn(async move {
246 for res in join_set.join_all().await {
247 res?;
248 }
249 Ok(())
250 });
251 Ok(builder.build())
252 }
253
254 fn metrics(&self) -> Option<MetricsSet> {
255 Some(self.metrics.clone_inner())
256 }
257}
258
259async fn send_plan_task(ctx: Arc<TaskContext>, url: Url, request: SetPlanRequest) -> Result<()> {
260 let channel_resolver = get_distributed_channel_resolver(ctx.as_ref());
261 let mut client = channel_resolver.get_worker_client_for_url(&url).await?;
262
263 let mut headers = get_config_extension_propagation_headers(ctx.session_config())?;
264 headers.extend(get_passthrough_headers(ctx.session_config()));
265
266 let msg = CoordinatorToWorkerMsg {
267 inner: Some(Inner::SetPlanRequest(request)),
268 };
269 let request = Request::from_parts(
270 MetadataMap::from_headers(headers),
271 Extensions::default(),
272 futures::stream::once(async { msg }),
273 );
274
275 client.coordinator_channel(request).await.map_err(|e| {
276 tonic_status_to_datafusion_error(&e)
277 .unwrap_or_else(|| exec_datafusion_err!("Error sending plan to worker {url}: {e}"))
278 })?;
279 Ok(())
280}
281
282struct LatencyMetric {
285 max: Time,
286 avg: Time,
287 max_latency_micros: AtomicU64,
288 sum_latency_micros: AtomicU64,
289 count_latency_micros: AtomicU64,
290}
291
292impl Drop for LatencyMetric {
293 fn drop(&mut self) {
294 self.max.add_duration(Duration::from_micros(
295 self.max_latency_micros.load(Ordering::Relaxed),
296 ));
297 self.avg.add_duration(Duration::from_micros(
298 self.sum_latency_micros.load(Ordering::Relaxed)
299 / self.count_latency_micros.load(Ordering::Relaxed).max(1),
300 ));
301 }
302}
303
304impl LatencyMetric {
305 fn new(
306 name: impl Display,
307 builder: impl Fn(MetricBuilder) -> MetricBuilder,
308 metrics: &ExecutionPlanMetricsSet,
309 ) -> Self {
310 let max = Time::new();
311 builder(MetricBuilder::new(metrics)).build(MetricValue::Time {
312 name: format!("{name}_max").into(),
313 time: max.clone(),
314 });
315 let avg = Time::new();
316 builder(MetricBuilder::new(metrics)).build(MetricValue::Time {
317 name: format!("{name}_avg").into(),
318 time: avg.clone(),
319 });
320 Self {
321 max,
322 avg,
323 max_latency_micros: AtomicU64::new(0),
324 sum_latency_micros: AtomicU64::new(0),
325 count_latency_micros: AtomicU64::new(0),
326 }
327 }
328
329 fn record(&self, start: &Instant) {
330 let micros = start.elapsed().as_micros() as u64;
331 self.max_latency_micros.fetch_max(micros, Ordering::Relaxed);
332 self.sum_latency_micros.fetch_add(micros, Ordering::Relaxed);
333 self.count_latency_micros.fetch_add(1, Ordering::Relaxed);
334 }
335}