datafusion_physical_plan/
analyze.rs1use std::any::Any;
21use std::sync::Arc;
22
23use super::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
24use super::{
25 DisplayAs, Distribution, ExecutionPlanProperties, PlanProperties,
26 SendableRecordBatchStream,
27};
28use crate::display::DisplayableExecutionPlan;
29use crate::metrics::MetricType;
30use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
31
32use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
33use datafusion_common::instant::Instant;
34use datafusion_common::{internal_err, DataFusionError, Result};
35use datafusion_execution::TaskContext;
36use datafusion_physical_expr::EquivalenceProperties;
37
38use futures::StreamExt;
39
40#[derive(Debug, Clone)]
43pub struct AnalyzeExec {
44 verbose: bool,
46 show_statistics: bool,
48 metric_types: Vec<MetricType>,
50 pub(crate) input: Arc<dyn ExecutionPlan>,
52 schema: SchemaRef,
54 cache: PlanProperties,
55}
56
57impl AnalyzeExec {
58 pub fn new(
60 verbose: bool,
61 show_statistics: bool,
62 metric_types: Vec<MetricType>,
63 input: Arc<dyn ExecutionPlan>,
64 schema: SchemaRef,
65 ) -> Self {
66 let cache = Self::compute_properties(&input, Arc::clone(&schema));
67 AnalyzeExec {
68 verbose,
69 show_statistics,
70 metric_types,
71 input,
72 schema,
73 cache,
74 }
75 }
76
77 pub fn verbose(&self) -> bool {
79 self.verbose
80 }
81
82 pub fn show_statistics(&self) -> bool {
84 self.show_statistics
85 }
86
87 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
89 &self.input
90 }
91
92 fn compute_properties(
94 input: &Arc<dyn ExecutionPlan>,
95 schema: SchemaRef,
96 ) -> PlanProperties {
97 PlanProperties::new(
98 EquivalenceProperties::new(schema),
99 Partitioning::UnknownPartitioning(1),
100 input.pipeline_behavior(),
101 input.boundedness(),
102 )
103 }
104}
105
106impl DisplayAs for AnalyzeExec {
107 fn fmt_as(
108 &self,
109 t: DisplayFormatType,
110 f: &mut std::fmt::Formatter,
111 ) -> std::fmt::Result {
112 match t {
113 DisplayFormatType::Default | DisplayFormatType::Verbose => {
114 write!(f, "AnalyzeExec verbose={}", self.verbose)
115 }
116 DisplayFormatType::TreeRender => {
117 write!(f, "")
119 }
120 }
121 }
122}
123
124impl ExecutionPlan for AnalyzeExec {
125 fn name(&self) -> &'static str {
126 "AnalyzeExec"
127 }
128
129 fn as_any(&self) -> &dyn Any {
131 self
132 }
133
134 fn properties(&self) -> &PlanProperties {
135 &self.cache
136 }
137
138 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
139 vec![&self.input]
140 }
141
142 fn required_input_distribution(&self) -> Vec<Distribution> {
143 vec![Distribution::UnspecifiedDistribution]
144 }
145
146 fn with_new_children(
147 self: Arc<Self>,
148 mut children: Vec<Arc<dyn ExecutionPlan>>,
149 ) -> Result<Arc<dyn ExecutionPlan>> {
150 Ok(Arc::new(Self::new(
151 self.verbose,
152 self.show_statistics,
153 self.metric_types.clone(),
154 children.pop().unwrap(),
155 Arc::clone(&self.schema),
156 )))
157 }
158
159 fn execute(
160 &self,
161 partition: usize,
162 context: Arc<TaskContext>,
163 ) -> Result<SendableRecordBatchStream> {
164 if 0 != partition {
165 return internal_err!(
166 "AnalyzeExec invalid partition. Expected 0, got {partition}"
167 );
168 }
169
170 let num_input_partitions = self.input.output_partitioning().partition_count();
174 let mut builder =
175 RecordBatchReceiverStream::builder(self.schema(), num_input_partitions);
176
177 for input_partition in 0..num_input_partitions {
178 builder.run_input(
179 Arc::clone(&self.input),
180 input_partition,
181 Arc::clone(&context),
182 );
183 }
184
185 let start = Instant::now();
187 let captured_input = Arc::clone(&self.input);
188 let captured_schema = Arc::clone(&self.schema);
189 let verbose = self.verbose;
190 let show_statistics = self.show_statistics;
191 let metric_types = self.metric_types.clone();
192
193 let mut input_stream = builder.build();
197 let output = async move {
198 let mut total_rows = 0;
199 while let Some(batch) = input_stream.next().await.transpose()? {
200 total_rows += batch.num_rows();
201 }
202
203 let duration = Instant::now() - start;
204 create_output_batch(
205 verbose,
206 show_statistics,
207 total_rows,
208 duration,
209 captured_input,
210 captured_schema,
211 &metric_types,
212 )
213 };
214
215 Ok(Box::pin(RecordBatchStreamAdapter::new(
216 Arc::clone(&self.schema),
217 futures::stream::once(output),
218 )))
219 }
220}
221
222fn create_output_batch(
224 verbose: bool,
225 show_statistics: bool,
226 total_rows: usize,
227 duration: std::time::Duration,
228 input: Arc<dyn ExecutionPlan>,
229 schema: SchemaRef,
230 metric_types: &[MetricType],
231) -> Result<RecordBatch> {
232 let mut type_builder = StringBuilder::with_capacity(1, 1024);
233 let mut plan_builder = StringBuilder::with_capacity(1, 1024);
234
235 type_builder.append_value("Plan with Metrics");
237
238 let annotated_plan = DisplayableExecutionPlan::with_metrics(input.as_ref())
239 .set_metric_types(metric_types.to_vec())
240 .set_show_statistics(show_statistics)
241 .indent(verbose)
242 .to_string();
243 plan_builder.append_value(annotated_plan);
244
245 if verbose {
248 type_builder.append_value("Plan with Full Metrics");
249
250 let annotated_plan = DisplayableExecutionPlan::with_full_metrics(input.as_ref())
251 .set_metric_types(metric_types.to_vec())
252 .set_show_statistics(show_statistics)
253 .indent(verbose)
254 .to_string();
255 plan_builder.append_value(annotated_plan);
256
257 type_builder.append_value("Output Rows");
258 plan_builder.append_value(total_rows.to_string());
259
260 type_builder.append_value("Duration");
261 plan_builder.append_value(format!("{duration:?}"));
262 }
263
264 RecordBatch::try_new(
265 schema,
266 vec![
267 Arc::new(type_builder.finish()),
268 Arc::new(plan_builder.finish()),
269 ],
270 )
271 .map_err(DataFusionError::from)
272}
273
274#[cfg(test)]
275mod tests {
276 use super::*;
277 use crate::{
278 collect,
279 test::{
280 assert_is_pending,
281 exec::{assert_strong_count_converges_to_zero, BlockingExec},
282 },
283 };
284
285 use arrow::datatypes::{DataType, Field, Schema};
286 use futures::FutureExt;
287
288 #[tokio::test]
289 async fn test_drop_cancel() -> Result<()> {
290 let task_ctx = Arc::new(TaskContext::default());
291 let schema =
292 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
293
294 let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
295 let refs = blocking_exec.refs();
296 let analyze_exec = Arc::new(AnalyzeExec::new(
297 true,
298 false,
299 vec![MetricType::SUMMARY, MetricType::DEV],
300 blocking_exec,
301 schema,
302 ));
303
304 let fut = collect(analyze_exec, task_ctx);
305 let mut fut = fut.boxed();
306
307 assert_is_pending(&mut fut);
308 drop(fut);
309 assert_strong_count_converges_to_zero(refs).await;
310
311 Ok(())
312 }
313}