datafusion_physical_plan/
analyze.rs1use std::sync::Arc;
21
22use super::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
23use super::{
24 DisplayAs, Distribution, ExecutionPlanProperties, PlanProperties,
25 SendableRecordBatchStream,
26};
27use crate::display::DisplayableExecutionPlan;
28use crate::metrics::{MetricCategory, MetricType};
29use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
30
31use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
32use datafusion_common::instant::Instant;
33use datafusion_common::{DataFusionError, Result, assert_eq_or_internal_err};
34use datafusion_execution::TaskContext;
35use datafusion_physical_expr::EquivalenceProperties;
36
37use futures::StreamExt;
38
39#[derive(Debug, Clone)]
42pub struct AnalyzeExec {
43 verbose: bool,
45 show_statistics: bool,
47 metric_types: Vec<MetricType>,
49 metric_categories: Option<Vec<MetricCategory>>,
51 pub(crate) input: Arc<dyn ExecutionPlan>,
53 schema: SchemaRef,
55 cache: Arc<PlanProperties>,
56}
57
58impl AnalyzeExec {
59 pub fn new(
61 verbose: bool,
62 show_statistics: bool,
63 metric_types: Vec<MetricType>,
64 metric_categories: Option<Vec<MetricCategory>>,
65 input: Arc<dyn ExecutionPlan>,
66 schema: SchemaRef,
67 ) -> Self {
68 let cache = Self::compute_properties(&input, Arc::clone(&schema));
69 AnalyzeExec {
70 verbose,
71 show_statistics,
72 metric_types,
73 metric_categories,
74 input,
75 schema,
76 cache: Arc::new(cache),
77 }
78 }
79
80 pub fn verbose(&self) -> bool {
82 self.verbose
83 }
84
85 pub fn show_statistics(&self) -> bool {
87 self.show_statistics
88 }
89
90 pub fn metric_categories(&self) -> Option<&[MetricCategory]> {
92 self.metric_categories.as_deref()
93 }
94
95 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
97 &self.input
98 }
99
100 fn compute_properties(
102 input: &Arc<dyn ExecutionPlan>,
103 schema: SchemaRef,
104 ) -> PlanProperties {
105 PlanProperties::new(
106 EquivalenceProperties::new(schema),
107 Partitioning::UnknownPartitioning(1),
108 input.pipeline_behavior(),
109 input.boundedness(),
110 )
111 }
112}
113
114impl DisplayAs for AnalyzeExec {
115 fn fmt_as(
116 &self,
117 t: DisplayFormatType,
118 f: &mut std::fmt::Formatter,
119 ) -> std::fmt::Result {
120 match t {
121 DisplayFormatType::Default | DisplayFormatType::Verbose => {
122 write!(f, "AnalyzeExec verbose={}", self.verbose)
123 }
124 DisplayFormatType::TreeRender => {
125 write!(f, "")
127 }
128 }
129 }
130}
131
132impl ExecutionPlan for AnalyzeExec {
133 fn name(&self) -> &'static str {
134 "AnalyzeExec"
135 }
136
137 fn properties(&self) -> &Arc<PlanProperties> {
139 &self.cache
140 }
141
142 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
143 vec![&self.input]
144 }
145
146 fn required_input_distribution(&self) -> Vec<Distribution> {
147 vec![Distribution::UnspecifiedDistribution]
148 }
149
150 fn with_new_children(
151 self: Arc<Self>,
152 mut children: Vec<Arc<dyn ExecutionPlan>>,
153 ) -> Result<Arc<dyn ExecutionPlan>> {
154 Ok(Arc::new(Self::new(
155 self.verbose,
156 self.show_statistics,
157 self.metric_types.clone(),
158 self.metric_categories.clone(),
159 children.pop().unwrap(),
160 Arc::clone(&self.schema),
161 )))
162 }
163
164 fn execute(
165 &self,
166 partition: usize,
167 context: Arc<TaskContext>,
168 ) -> Result<SendableRecordBatchStream> {
169 assert_eq_or_internal_err!(
170 partition,
171 0,
172 "AnalyzeExec invalid partition. Expected 0, got {partition}"
173 );
174
175 let num_input_partitions = self.input.output_partitioning().partition_count();
179 let mut builder =
180 RecordBatchReceiverStream::builder(self.schema(), num_input_partitions);
181
182 for input_partition in 0..num_input_partitions {
183 builder.run_input(
184 Arc::clone(&self.input),
185 input_partition,
186 Arc::clone(&context),
187 );
188 }
189
190 let start = Instant::now();
192 let captured_input = Arc::clone(&self.input);
193 let captured_schema = Arc::clone(&self.schema);
194 let verbose = self.verbose;
195 let show_statistics = self.show_statistics;
196 let metric_types = self.metric_types.clone();
197 let metric_categories = self.metric_categories.clone();
198
199 let mut input_stream = builder.build();
203 let output = async move {
204 let mut total_rows = 0;
205 while let Some(batch) = input_stream.next().await.transpose()? {
206 total_rows += batch.num_rows();
207 }
208 drop(input_stream);
209
210 let duration = Instant::now() - start;
211 create_output_batch(
212 verbose,
213 show_statistics,
214 total_rows,
215 duration,
216 &captured_input,
217 &captured_schema,
218 &metric_types,
219 metric_categories.as_deref(),
220 )
221 };
222
223 Ok(Box::pin(RecordBatchStreamAdapter::new(
224 Arc::clone(&self.schema),
225 futures::stream::once(output),
226 )))
227 }
228}
229
230#[expect(clippy::too_many_arguments)]
232fn create_output_batch(
233 verbose: bool,
234 show_statistics: bool,
235 total_rows: usize,
236 duration: std::time::Duration,
237 input: &Arc<dyn ExecutionPlan>,
238 schema: &SchemaRef,
239 metric_types: &[MetricType],
240 metric_categories: Option<&[MetricCategory]>,
241) -> Result<RecordBatch> {
242 let mut type_builder = StringBuilder::with_capacity(1, 1024);
243 let mut plan_builder = StringBuilder::with_capacity(1, 1024);
244
245 type_builder.append_value("Plan with Metrics");
247
248 let annotated_plan = DisplayableExecutionPlan::with_metrics(input.as_ref())
249 .set_metric_types(metric_types.to_vec())
250 .set_metric_categories(metric_categories.map(|c| c.to_vec()))
251 .set_show_statistics(show_statistics)
252 .indent(verbose)
253 .to_string();
254 plan_builder.append_value(annotated_plan);
255
256 if verbose {
259 type_builder.append_value("Plan with Full Metrics");
260
261 let annotated_plan = DisplayableExecutionPlan::with_full_metrics(input.as_ref())
262 .set_metric_types(metric_types.to_vec())
263 .set_metric_categories(metric_categories.map(|c| c.to_vec()))
264 .set_show_statistics(show_statistics)
265 .indent(verbose)
266 .to_string();
267 plan_builder.append_value(annotated_plan);
268
269 type_builder.append_value("Output Rows");
270 plan_builder.append_value(total_rows.to_string());
271
272 type_builder.append_value("Duration");
273 plan_builder.append_value(format!("{duration:?}"));
274 }
275
276 RecordBatch::try_new(
277 Arc::clone(schema),
278 vec![
279 Arc::new(type_builder.finish()),
280 Arc::new(plan_builder.finish()),
281 ],
282 )
283 .map_err(DataFusionError::from)
284}
285
286#[cfg(test)]
287mod tests {
288 use super::*;
289 use crate::{
290 collect,
291 test::{
292 assert_is_pending,
293 exec::{BlockingExec, assert_strong_count_converges_to_zero},
294 },
295 };
296
297 use arrow::datatypes::{DataType, Field, Schema};
298 use futures::FutureExt;
299
300 #[tokio::test]
301 async fn test_drop_cancel() -> Result<()> {
302 let task_ctx = Arc::new(TaskContext::default());
303 let schema =
304 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
305
306 let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
307 let refs = blocking_exec.refs();
308 let analyze_exec = Arc::new(AnalyzeExec::new(
309 true,
310 false,
311 vec![MetricType::Summary, MetricType::Dev],
312 None,
313 blocking_exec,
314 schema,
315 ));
316
317 let fut = collect(analyze_exec, task_ctx);
318 let mut fut = fut.boxed();
319
320 assert_is_pending(&mut fut);
321 drop(fut);
322 assert_strong_count_converges_to_zero(refs).await;
323
324 Ok(())
325 }
326}