Skip to main content

datafusion_distributed/coordinator/
distributed.rs

1use crate::common::{require_one_child, serialize_uuid};
2use crate::coordinator::metrics_store::MetricsStore;
3use crate::coordinator::prepare_static_plan::prepare_static_plan;
4use crate::distributed_planner::NetworkBoundaryExt;
5use crate::worker::generated::worker::TaskKey;
6use datafusion::common::internal_datafusion_err;
7use datafusion::common::runtime::JoinSet;
8use datafusion::common::tree_node::{TreeNode, TreeNodeRecursion};
9use datafusion::common::{Result, exec_err};
10use datafusion::execution::{SendableRecordBatchStream, TaskContext};
11use datafusion::physical_expr_common::metrics::MetricsSet;
12use datafusion::physical_plan::metrics::ExecutionPlanMetricsSet;
13use datafusion::physical_plan::stream::RecordBatchReceiverStreamBuilder;
14use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
15use futures::StreamExt;
16use std::fmt::Formatter;
17use std::sync::Arc;
18use std::sync::Mutex;
19
20/// [ExecutionPlan] that executes the inner plan in distributed mode.
21/// Before executing it, two modifications are lazily performed on the plan:
22/// 1. Assigns worker URLs to all the stages. Unless explicitly set in
23///    [crate::TaskEstimator::route_tasks], a random set of URLs are sampled from the
24///    channel resolver and assigned to each task in each stage.
25/// 2. Encodes all the plans in protobuf format so that network boundary nodes can send them
26///    over the wire.
27#[derive(Debug)]
28pub struct DistributedExec {
29    plan: Arc<dyn ExecutionPlan>,
30    prepared_plan: Arc<Mutex<Option<Arc<dyn ExecutionPlan>>>>,
31    metrics: ExecutionPlanMetricsSet,
32    pub(crate) metrics_store: Option<Arc<MetricsStore>>,
33}
34
35pub(super) struct PreparedPlan {
36    pub(super) head_stage: Arc<dyn ExecutionPlan>,
37    pub(super) join_set: JoinSet<Result<()>>,
38}
39
40impl DistributedExec {
41    pub fn new(plan: Arc<dyn ExecutionPlan>) -> Self {
42        Self {
43            plan,
44            prepared_plan: Arc::new(Mutex::new(None)),
45            metrics: ExecutionPlanMetricsSet::new(),
46            metrics_store: None,
47        }
48    }
49
50    /// Enables task metrics collection from remote workers.
51    pub fn with_metrics_collection(mut self, enabled: bool) -> Self {
52        self.metrics_store = match enabled {
53            true => Some(Arc::new(MetricsStore::new())),
54            false => None,
55        };
56        self
57    }
58
59    /// Waits until all worker tasks have reported their metrics back via the coordinator channel.
60    ///
61    /// Metrics are delivered asynchronously after query execution completes, so callers that need
62    /// complete metrics (e.g. for observability or display) should await this before inspecting
63    /// [`Self::task_metrics`] or calling [`rewrite_distributed_plan_with_metrics`].
64    ///
65    /// [`rewrite_distributed_plan_with_metrics`]: crate::rewrite_distributed_plan_with_metrics
66    pub async fn wait_for_metrics(&self) {
67        let mut expected_keys: Vec<TaskKey> = Vec::new();
68        let Some(task_metrics) = &self.metrics_store else {
69            return;
70        };
71        let _ = self.plan.apply(|plan| {
72            if let Some(boundary) = plan.as_network_boundary() {
73                let stage = boundary.input_stage();
74                for i in 0..stage.task_count() {
75                    expected_keys.push(TaskKey {
76                        query_id: serialize_uuid(&stage.query_id()),
77                        stage_id: stage.num() as u64,
78                        task_number: i as u64,
79                    });
80                }
81            }
82            Ok(TreeNodeRecursion::Continue)
83        });
84        if expected_keys.is_empty() {
85            return;
86        }
87        let mut rx = task_metrics.rx.clone();
88        let _ = rx
89            .wait_for(|map| expected_keys.iter().all(|key| map.contains_key(key)))
90            .await;
91    }
92
93    /// Returns the plan which is lazily prepared on `execute()` and actually gets executed.
94    /// It is updated on every call to `execute()`. Returns an error if `.execute()` has not been
95    /// called.
96    pub(crate) fn prepared_plan(&self) -> Result<Arc<dyn ExecutionPlan>> {
97        self.prepared_plan
98            .lock()
99            .map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {}", e))?
100            .clone()
101            .ok_or_else(|| {
102                internal_datafusion_err!("No prepared plan found. Was execute() called?")
103            })
104    }
105}
106
107impl DisplayAs for DistributedExec {
108    fn fmt_as(&self, _: DisplayFormatType, f: &mut Formatter) -> std::fmt::Result {
109        write!(f, "DistributedExec")
110    }
111}
112
113impl ExecutionPlan for DistributedExec {
114    fn name(&self) -> &str {
115        "DistributedExec"
116    }
117
118    fn properties(&self) -> &Arc<PlanProperties> {
119        self.plan.properties()
120    }
121
122    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
123        vec![&self.plan]
124    }
125
126    fn with_new_children(
127        self: Arc<Self>,
128        children: Vec<Arc<dyn ExecutionPlan>>,
129    ) -> Result<Arc<dyn ExecutionPlan>> {
130        Ok(Arc::new(DistributedExec {
131            plan: require_one_child(&children)?,
132            prepared_plan: self.prepared_plan.clone(),
133            metrics: self.metrics.clone(),
134            metrics_store: self.metrics_store.clone(),
135        }))
136    }
137
138    fn execute(
139        &self,
140        partition: usize,
141        context: Arc<TaskContext>,
142    ) -> Result<SendableRecordBatchStream> {
143        if partition > 0 {
144            // The DistributedExec node calls try_assign_urls() lazily upon calling .execute(). This means
145            // that .execute() must only be called once, as we cannot afford to perform several
146            // random URL assignation while calling multiple partitions, as they will differ,
147            // producing an invalid plan
148            return exec_err!(
149                "DistributedExec must only have 1 partition, but it was called with partition index {partition}"
150            );
151        }
152
153        let PreparedPlan {
154            head_stage,
155            join_set,
156        } = prepare_static_plan(&self.plan, &self.metrics, &self.metrics_store, &context)?;
157        {
158            let mut guard = self
159                .prepared_plan
160                .lock()
161                .map_err(|e| internal_datafusion_err!("Failed to lock prepared plan: {e}"))?;
162            *guard = Some(head_stage.clone());
163        }
164        let mut builder = RecordBatchReceiverStreamBuilder::new(self.schema(), 1);
165        let tx = builder.tx();
166        // Spawn the task that pulls data from child...
167        builder.spawn(async move {
168            let mut stream = head_stage.execute(partition, context)?;
169            while let Some(msg) = stream.next().await {
170                if tx.send(msg).await.is_err() {
171                    break; // channel closed
172                }
173            }
174            Ok(())
175        });
176        // ...in parallel to the one that feeds the plan to workers.
177        builder.spawn(async move {
178            for res in join_set.join_all().await {
179                res?;
180            }
181            Ok(())
182        });
183        Ok(builder.build())
184    }
185
186    fn metrics(&self) -> Option<MetricsSet> {
187        Some(self.metrics.clone_inner())
188    }
189}