1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use std::any::Any;
use std::sync::Arc;
use crate::error::{ExecutionError, Result};
use crate::{
logical_plan::StringifiedPlan,
physical_plan::{common::RecordBatchIterator, ExecutionPlan},
};
use arrow::{array::StringBuilder, datatypes::SchemaRef, record_batch::RecordBatch};
use crate::physical_plan::Partitioning;
use super::SendableRecordBatchReader;
use async_trait::async_trait;
#[derive(Debug, Clone)]
pub struct ExplainExec {
schema: SchemaRef,
stringified_plans: Vec<StringifiedPlan>,
}
impl ExplainExec {
pub fn new(schema: SchemaRef, stringified_plans: Vec<StringifiedPlan>) -> Self {
ExplainExec {
schema,
stringified_plans,
}
}
}
#[async_trait]
impl ExecutionPlan for ExplainExec {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
fn output_partitioning(&self) -> Partitioning {
Partitioning::UnknownPartitioning(1)
}
fn with_new_children(
&self,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
if children.is_empty() {
Ok(Arc::new(self.clone()))
} else {
Err(ExecutionError::General(format!(
"Children cannot be replaced in {:?}",
self
)))
}
}
async fn execute(&self, partition: usize) -> Result<SendableRecordBatchReader> {
if 0 != partition {
return Err(ExecutionError::General(format!(
"ExplainExec invalid partition {}",
partition
)));
}
let mut type_builder = StringBuilder::new(self.stringified_plans.len());
let mut plan_builder = StringBuilder::new(self.stringified_plans.len());
for p in &self.stringified_plans {
type_builder.append_value(&String::from(&p.plan_type))?;
plan_builder.append_value(&p.plan)?;
}
let record_batch = RecordBatch::try_new(
self.schema.clone(),
vec![
Arc::new(type_builder.finish()),
Arc::new(plan_builder.finish()),
],
)?;
Ok(Box::new(RecordBatchIterator::new(
self.schema.clone(),
vec![Arc::new(record_batch)],
)))
}
}