datafusion_physical_plan/
analyze.rs1use std::any::Any;
21use std::sync::Arc;
22
23use super::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter};
24use super::{
25 DisplayAs, Distribution, ExecutionPlanProperties, PlanProperties,
26 SendableRecordBatchStream,
27};
28use crate::display::DisplayableExecutionPlan;
29use crate::{DisplayFormatType, ExecutionPlan, Partitioning};
30
31use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
32use datafusion_common::instant::Instant;
33use datafusion_common::{internal_err, DataFusionError, Result};
34use datafusion_execution::TaskContext;
35use datafusion_physical_expr::EquivalenceProperties;
36
37use futures::StreamExt;
38
39#[derive(Debug, Clone)]
42pub struct AnalyzeExec {
43 verbose: bool,
45 show_statistics: bool,
47 pub(crate) input: Arc<dyn ExecutionPlan>,
49 schema: SchemaRef,
51 cache: PlanProperties,
52}
53
54impl AnalyzeExec {
55 pub fn new(
57 verbose: bool,
58 show_statistics: bool,
59 input: Arc<dyn ExecutionPlan>,
60 schema: SchemaRef,
61 ) -> Self {
62 let cache = Self::compute_properties(&input, Arc::clone(&schema));
63 AnalyzeExec {
64 verbose,
65 show_statistics,
66 input,
67 schema,
68 cache,
69 }
70 }
71
72 pub fn verbose(&self) -> bool {
74 self.verbose
75 }
76
77 pub fn show_statistics(&self) -> bool {
79 self.show_statistics
80 }
81
82 pub fn input(&self) -> &Arc<dyn ExecutionPlan> {
84 &self.input
85 }
86
87 fn compute_properties(
89 input: &Arc<dyn ExecutionPlan>,
90 schema: SchemaRef,
91 ) -> PlanProperties {
92 PlanProperties::new(
93 EquivalenceProperties::new(schema),
94 Partitioning::UnknownPartitioning(1),
95 input.pipeline_behavior(),
96 input.boundedness(),
97 )
98 }
99}
100
101impl DisplayAs for AnalyzeExec {
102 fn fmt_as(
103 &self,
104 t: DisplayFormatType,
105 f: &mut std::fmt::Formatter,
106 ) -> std::fmt::Result {
107 match t {
108 DisplayFormatType::Default | DisplayFormatType::Verbose => {
109 write!(f, "AnalyzeExec verbose={}", self.verbose)
110 }
111 DisplayFormatType::TreeRender => {
112 write!(f, "")
114 }
115 }
116 }
117}
118
119impl ExecutionPlan for AnalyzeExec {
120 fn name(&self) -> &'static str {
121 "AnalyzeExec"
122 }
123
124 fn as_any(&self) -> &dyn Any {
126 self
127 }
128
129 fn properties(&self) -> &PlanProperties {
130 &self.cache
131 }
132
133 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
134 vec![&self.input]
135 }
136
137 fn required_input_distribution(&self) -> Vec<Distribution> {
138 vec![Distribution::UnspecifiedDistribution]
139 }
140
141 fn with_new_children(
142 self: Arc<Self>,
143 mut children: Vec<Arc<dyn ExecutionPlan>>,
144 ) -> Result<Arc<dyn ExecutionPlan>> {
145 Ok(Arc::new(Self::new(
146 self.verbose,
147 self.show_statistics,
148 children.pop().unwrap(),
149 Arc::clone(&self.schema),
150 )))
151 }
152
153 fn execute(
154 &self,
155 partition: usize,
156 context: Arc<TaskContext>,
157 ) -> Result<SendableRecordBatchStream> {
158 if 0 != partition {
159 return internal_err!(
160 "AnalyzeExec invalid partition. Expected 0, got {partition}"
161 );
162 }
163
164 let num_input_partitions = self.input.output_partitioning().partition_count();
168 let mut builder =
169 RecordBatchReceiverStream::builder(self.schema(), num_input_partitions);
170
171 for input_partition in 0..num_input_partitions {
172 builder.run_input(
173 Arc::clone(&self.input),
174 input_partition,
175 Arc::clone(&context),
176 );
177 }
178
179 let start = Instant::now();
181 let captured_input = Arc::clone(&self.input);
182 let captured_schema = Arc::clone(&self.schema);
183 let verbose = self.verbose;
184 let show_statistics = self.show_statistics;
185
186 let mut input_stream = builder.build();
190 let output = async move {
191 let mut total_rows = 0;
192 while let Some(batch) = input_stream.next().await.transpose()? {
193 total_rows += batch.num_rows();
194 }
195
196 let duration = Instant::now() - start;
197 create_output_batch(
198 verbose,
199 show_statistics,
200 total_rows,
201 duration,
202 captured_input,
203 captured_schema,
204 )
205 };
206
207 Ok(Box::pin(RecordBatchStreamAdapter::new(
208 Arc::clone(&self.schema),
209 futures::stream::once(output),
210 )))
211 }
212}
213
214fn create_output_batch(
216 verbose: bool,
217 show_statistics: bool,
218 total_rows: usize,
219 duration: std::time::Duration,
220 input: Arc<dyn ExecutionPlan>,
221 schema: SchemaRef,
222) -> Result<RecordBatch> {
223 let mut type_builder = StringBuilder::with_capacity(1, 1024);
224 let mut plan_builder = StringBuilder::with_capacity(1, 1024);
225
226 type_builder.append_value("Plan with Metrics");
228
229 let annotated_plan = DisplayableExecutionPlan::with_metrics(input.as_ref())
230 .set_show_statistics(show_statistics)
231 .indent(verbose)
232 .to_string();
233 plan_builder.append_value(annotated_plan);
234
235 if verbose {
238 type_builder.append_value("Plan with Full Metrics");
239
240 let annotated_plan = DisplayableExecutionPlan::with_full_metrics(input.as_ref())
241 .set_show_statistics(show_statistics)
242 .indent(verbose)
243 .to_string();
244 plan_builder.append_value(annotated_plan);
245
246 type_builder.append_value("Output Rows");
247 plan_builder.append_value(total_rows.to_string());
248
249 type_builder.append_value("Duration");
250 plan_builder.append_value(format!("{duration:?}"));
251 }
252
253 RecordBatch::try_new(
254 schema,
255 vec![
256 Arc::new(type_builder.finish()),
257 Arc::new(plan_builder.finish()),
258 ],
259 )
260 .map_err(DataFusionError::from)
261}
262
263#[cfg(test)]
264mod tests {
265 use super::*;
266 use crate::{
267 collect,
268 test::{
269 assert_is_pending,
270 exec::{assert_strong_count_converges_to_zero, BlockingExec},
271 },
272 };
273
274 use arrow::datatypes::{DataType, Field, Schema};
275 use futures::FutureExt;
276
277 #[tokio::test]
278 async fn test_drop_cancel() -> Result<()> {
279 let task_ctx = Arc::new(TaskContext::default());
280 let schema =
281 Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));
282
283 let blocking_exec = Arc::new(BlockingExec::new(Arc::clone(&schema), 1));
284 let refs = blocking_exec.refs();
285 let analyze_exec = Arc::new(AnalyzeExec::new(true, false, blocking_exec, schema));
286
287 let fut = collect(analyze_exec, task_ctx);
288 let mut fut = fut.boxed();
289
290 assert_is_pending(&mut fut);
291 drop(fut);
292 assert_strong_count_converges_to_zero(refs).await;
293
294 Ok(())
295 }
296}