datafusion_distributed/worker/
task_data.rs1use crate::MaxLatencyMetric;
2use crate::common::{OnceLockResult, now_ns};
3use crate::distributed_planner::{ProducerHead, insert_producer_head};
4use crate::worker::generated::worker as pb;
5use datafusion::common::{DataFusionError, Result};
6use datafusion::execution::TaskContext;
7use datafusion::physical_expr_common::metrics::CustomMetricValue;
8use datafusion::physical_plan::{ExecutionPlan, ExecutionPlanProperties};
9use std::sync::Arc;
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::time::Duration;
12use tokio::sync::oneshot;
13
14#[derive(Clone, Debug)]
15pub struct TaskData {
18 pub(super) task_ctx: Arc<TaskContext>,
20 pub(crate) base_plan: Arc<dyn ExecutionPlan>,
21 pub(crate) final_plan: Arc<OnceLockResult<Arc<dyn ExecutionPlan>>>,
22 pub(super) num_partitions_remaining: Arc<AtomicUsize>,
28 pub(super) metrics_tx: Arc<std::sync::Mutex<Option<oneshot::Sender<pb::TaskMetrics>>>>,
32 pub(super) task_data_metrics: Arc<TaskDataMetrics>,
36}
37
38pub(crate) const PLAN_ADDED_AT_METRIC: &str = "plan_added_at";
39pub(crate) const PLAN_EXECUTED_AT_METRIC: &str = "plan_executed_at";
40pub(crate) const PLAN_FINISHED_AT_METRIC: &str = "plan_finished_at";
41
42#[derive(Debug)]
43pub(super) struct TaskDataMetrics {
44 pub(super) query_start_time_ns: u64,
45 pub(super) plan_added_at: MaxLatencyMetric,
47 pub(super) plan_executed_at: MaxLatencyMetric,
49 pub(super) plan_finished_at: MaxLatencyMetric,
51}
52
53impl TaskDataMetrics {
54 pub(super) fn new(query_start_time_ns: u64) -> Self {
55 let plan_added_at = MaxLatencyMetric::default();
56 plan_added_at.add_duration(Duration::from_nanos(
57 now_ns().saturating_sub(query_start_time_ns),
58 ));
59 Self {
60 query_start_time_ns,
61 plan_added_at,
62 plan_finished_at: MaxLatencyMetric::default(),
63 plan_executed_at: MaxLatencyMetric::default(),
64 }
65 }
66
67 pub(super) fn mark_execution_started_once(&self) {
68 if self.plan_executed_at.value() == 0 {
69 self.plan_executed_at.add_duration(Duration::from_nanos(
70 now_ns().saturating_sub(self.query_start_time_ns),
71 ))
72 }
73 }
74
75 pub(super) fn mark_execution_finished(&self) {
76 self.plan_finished_at.add_duration(Duration::from_nanos(
77 now_ns().saturating_sub(self.query_start_time_ns),
78 ))
79 }
80
81 pub(super) fn to_proto_metrics_set(&self) -> pb::MetricsSet {
82 let mut task_metrics_set = pb::MetricsSet { metrics: vec![] };
83
84 fn new_metric(name: &str, value: usize) -> pb::Metric {
85 pb::Metric {
86 partition: None,
87 labels: vec![],
88 value: Some(pb::metric::Value::CustomMaxLatency(pb::MaxLatency {
89 name: name.to_string(),
90 value: value as u64,
91 })),
92 }
93 }
94 task_metrics_set.metrics.push(new_metric(
95 PLAN_ADDED_AT_METRIC,
96 self.plan_added_at.as_usize(),
97 ));
98 task_metrics_set.metrics.push(new_metric(
99 PLAN_EXECUTED_AT_METRIC,
100 self.plan_executed_at.as_usize(),
101 ));
102 task_metrics_set.metrics.push(new_metric(
103 PLAN_FINISHED_AT_METRIC,
104 self.plan_finished_at.as_usize(),
105 ));
106
107 task_metrics_set
108 }
109}
110
111impl TaskData {
112 pub(crate) fn num_partitions_remaining(&self) -> usize {
114 self.num_partitions_remaining.load(Ordering::SeqCst)
115 }
116
117 pub(crate) fn total_partitions(&self) -> usize {
119 match self.final_plan.get() {
120 Some(Ok(plan)) => plan.output_partitioning().partition_count(),
121 _ => self
122 .base_plan
123 .properties()
124 .output_partitioning()
125 .partition_count(),
126 }
127 }
128
129 pub(crate) fn plan(
130 &self,
131 producer_head: pb::execute_task_request::ProducerHead,
132 ) -> Result<Arc<dyn ExecutionPlan>> {
133 let result = self.final_plan.get_or_init(|| {
134 let producer_head =
135 ProducerHead::from_proto(producer_head, &self.base_plan.schema(), &self.task_ctx)?;
136
137 let plan = insert_producer_head(Arc::clone(&self.base_plan), producer_head)?;
138
139 self.num_partitions_remaining.store(
140 plan.output_partitioning().partition_count(),
141 Ordering::SeqCst,
142 );
143 Ok(plan)
144 });
145 match result {
146 Ok(plan) => Ok(Arc::clone(plan)),
147 Err(err) => Err(DataFusionError::Shared(Arc::clone(err))),
148 }
149 }
150}