Skip to main content

datafusion_distributed/worker/
task_data.rs

1use crate::MaxLatencyMetric;
2use crate::common::{OnceLockResult, now_ns};
3use crate::distributed_planner::{ProducerHead, insert_producer_head};
4use crate::worker::generated::worker as pb;
5use datafusion::common::{DataFusionError, Result};
6use datafusion::execution::TaskContext;
7use datafusion::physical_expr_common::metrics::CustomMetricValue;
8use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
9use std::sync::Arc;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::time::Duration;
12use tokio::sync::oneshot;
13
14#[derive(Clone, Debug)]
15/// TaskData stores state for a single task being executed by this Endpoint. It may be shared
16/// by concurrent requests for the same task which execute separate partitions.
17pub struct TaskData {
18    /// Task context suitable for execute different partitions from the same task.
19    pub(super) task_ctx: Arc<TaskContext>,
20    pub(crate) base_plan: Arc<dyn ExecutionPlan>,
21    pub(crate) final_plan: Arc<OnceLockResult<Arc<dyn ExecutionPlan>>>,
22    /// `num_partitions_remaining` is initialized to the total number of partitions in the task (not
23    /// only tasks in the partition group). This is decremented for each request to the endpoint
24    /// for this task. Once this count is zero, the task is likely complete. The task may not be
25    /// complete because it's possible that the same partition was retried and this count was
26    /// decremented more than once for the same partition.
27    pub(super) num_partitions_remaining: Arc<AtomicUsize>,
28    /// Sender half of the metrics channel. `impl_execute_task` takes this (via `Option::take`)
29    /// once all partitions have finished or been dropped, sending the collected metrics back to
30    /// the coordinator through the `CoordinatorChannel` side channel.
31    pub(super) metrics_tx: Arc<std::sync::Mutex<Option<oneshot::Sender<pb::TaskMetrics>>>>,
32    /// Metrics related to the execution of a task within a stage. This metrics, instead of being
33    /// associated to a specific node, they are global to the task, like the time at which the plan
34    /// was fed by the coordinator to the worker.
35    pub(super) task_data_metrics: Arc<TaskDataMetrics>,
36}
37
38pub(crate) const PLAN_ADDED_AT_METRIC: &str = "plan_added_at";
39pub(crate) const PLAN_EXECUTED_AT_METRIC: &str = "plan_executed_at";
40pub(crate) const PLAN_FINISHED_AT_METRIC: &str = "plan_finished_at";
41
42#[derive(Debug)]
43pub(super) struct TaskDataMetrics {
44    pub(super) query_start_time_ns: u64,
45    /// When the plan was set by the coordinator.
46    pub(super) plan_added_at: MaxLatencyMetric,
47    /// When the plan execution was triggered by the parent worker.
48    pub(super) plan_executed_at: MaxLatencyMetric,
49    /// When the execution stream finished.
50    pub(super) plan_finished_at: MaxLatencyMetric,
51}
52
53impl TaskDataMetrics {
54    pub(super) fn new(query_start_time_ns: u64) -> Self {
55        let plan_added_at = MaxLatencyMetric::default();
56        plan_added_at.add_duration(Duration::from_nanos(
57            now_ns().saturating_sub(query_start_time_ns),
58        ));
59        Self {
60            query_start_time_ns,
61            plan_added_at,
62            plan_finished_at: MaxLatencyMetric::default(),
63            plan_executed_at: MaxLatencyMetric::default(),
64        }
65    }
66
67    pub(super) fn mark_execution_started_once(&self) {
68        if self.plan_executed_at.value() == 0 {
69            self.plan_executed_at.add_duration(Duration::from_nanos(
70                now_ns().saturating_sub(self.query_start_time_ns),
71            ))
72        }
73    }
74
75    pub(super) fn mark_execution_finished(&self) {
76        self.plan_finished_at.add_duration(Duration::from_nanos(
77            now_ns().saturating_sub(self.query_start_time_ns),
78        ))
79    }
80
81    pub(super) fn to_proto_metrics_set(&self) -> pb::MetricsSet {
82        let mut task_metrics_set = pb::MetricsSet { metrics: vec![] };
83
84        fn new_metric(name: &str, value: usize) -> pb::Metric {
85            pb::Metric {
86                partition: None,
87                labels: vec![],
88                value: Some(pb::metric::Value::CustomMaxLatency(pb::MaxLatency {
89                    name: name.to_string(),
90                    value: value as u64,
91                })),
92            }
93        }
94        task_metrics_set.metrics.push(new_metric(
95            PLAN_ADDED_AT_METRIC,
96            self.plan_added_at.as_usize(),
97        ));
98        task_metrics_set.metrics.push(new_metric(
99            PLAN_EXECUTED_AT_METRIC,
100            self.plan_executed_at.as_usize(),
101        ));
102        task_metrics_set.metrics.push(new_metric(
103            PLAN_FINISHED_AT_METRIC,
104            self.plan_finished_at.as_usize(),
105        ));
106
107        task_metrics_set
108    }
109}
110
111impl TaskData {
112    /// Returns the number of partitions remaining to be processed.
113    pub(crate) fn num_partitions_remaining(&self) -> usize {
114        self.num_partitions_remaining.load(Ordering::SeqCst)
115    }
116
117    /// Returns the total number of partitions in this task.
118    pub(crate) fn total_partitions(&self) -> usize {
119        match self.final_plan.get() {
120            Some(Ok(plan)) => plan.output_partitioning().partition_count(),
121            _ => self
122                .base_plan
123                .properties()
124                .output_partitioning()
125                .partition_count(),
126        }
127    }
128
129    pub(crate) fn plan(
130        &self,
131        producer_head: pb::execute_task_request::ProducerHead,
132    ) -> Result<Arc<dyn ExecutionPlan>> {
133        let result = self.final_plan.get_or_init(|| {
134            let producer_head =
135                ProducerHead::from_proto(producer_head, &self.base_plan.schema(), &self.task_ctx)?;
136
137            let plan = insert_producer_head(Arc::clone(&self.base_plan), producer_head)?;
138
139            self.num_partitions_remaining.store(
140                plan.output_partitioning().partition_count(),
141                Ordering::SeqCst,
142            );
143            Ok(plan)
144        });
145        match result {
146            Ok(plan) => Ok(Arc::clone(plan)),
147            Err(err) => Err(DataFusionError::Shared(Arc::clone(err))),
148        }
149    }
150}