datafusion_federation/schema_cast/
mod.rs1use async_stream::stream;
2use datafusion::arrow::datatypes::SchemaRef;
3use datafusion::common::Statistics;
4use datafusion::config::ConfigOptions;
5use datafusion::error::{DataFusionError, Result};
6use datafusion::execution::{SendableRecordBatchStream, TaskContext};
7use datafusion::physical_plan::filter_pushdown::{FilterDescription, FilterPushdownPhase};
8use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
9use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
10use datafusion::physical_plan::{
11 DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PhysicalExpr,
12 PlanProperties,
13};
14use futures::StreamExt;
15use std::any::Any;
16use std::clone::Clone;
17use std::fmt;
18use std::sync::Arc;
19
20mod intervals_cast;
21mod lists_cast;
22pub mod record_convert;
23mod struct_cast;
24
25#[derive(Debug)]
26#[allow(clippy::module_name_repetitions)]
27pub struct SchemaCastScanExec {
28 input: Arc<dyn ExecutionPlan>,
29 schema: SchemaRef,
30 properties: PlanProperties,
31 metrics_set: ExecutionPlanMetricsSet,
32}
33
34impl SchemaCastScanExec {
35 pub fn new(input: Arc<dyn ExecutionPlan>, schema: SchemaRef) -> Self {
36 let eq_properties = input.equivalence_properties().clone();
37 let emission_type = input.pipeline_behavior();
38 let boundedness = input.boundedness();
39 let properties = PlanProperties::new(
40 eq_properties,
41 input.output_partitioning().clone(),
42 emission_type,
43 boundedness,
44 );
45 Self {
46 input,
47 schema,
48 properties,
49 metrics_set: ExecutionPlanMetricsSet::new(),
50 }
51 }
52}
53
54impl DisplayAs for SchemaCastScanExec {
55 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
56 write!(f, "SchemaCastScanExec")
57 }
58}
59
60impl ExecutionPlan for SchemaCastScanExec {
61 fn name(&self) -> &str {
62 "SchemaCastScanExec"
63 }
64
65 fn as_any(&self) -> &dyn Any {
66 self
67 }
68
69 fn properties(&self) -> &PlanProperties {
70 &self.properties
71 }
72
73 fn schema(&self) -> SchemaRef {
74 Arc::clone(&self.schema)
75 }
76
77 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
78 vec![&self.input]
79 }
80
81 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
84 vec![false]
85 }
86
87 fn with_new_children(
88 self: Arc<Self>,
89 children: Vec<Arc<dyn ExecutionPlan>>,
90 ) -> Result<Arc<dyn ExecutionPlan>> {
91 if children.len() == 1 {
92 Ok(Arc::new(Self::new(
93 Arc::clone(&children[0]),
94 Arc::clone(&self.schema),
95 )))
96 } else {
97 Err(DataFusionError::Execution(
98 "SchemaCastScanExec expects exactly one input".to_string(),
99 ))
100 }
101 }
102
103 fn execute(
104 &self,
105 partition: usize,
106 context: Arc<TaskContext>,
107 ) -> Result<SendableRecordBatchStream> {
108 let mut stream = self.input.execute(partition, context)?;
109 let schema = Arc::clone(&self.schema);
110 let baseline_metrics = BaselineMetrics::new(&self.metrics_set, partition);
111
112 Ok(Box::pin(RecordBatchStreamAdapter::new(
113 Arc::clone(&schema),
114 {
115 stream! {
116 while let Some(batch) = stream.next().await {
117 let _timer = baseline_metrics.elapsed_compute().timer();
118 let batch = record_convert::try_cast_to(batch?, Arc::clone(&schema));
119 let batch = batch.map_err(|e| { DataFusionError::External(Box::new(e)) });
120 if let Ok(ref b) = batch {
121 baseline_metrics.output_rows().add(b.num_rows());
122 }
123 yield batch;
124 }
125 }
126 },
127 )))
128 }
129
130 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
131 self.input.partition_statistics(partition)
132 }
133
134 fn metrics(&self) -> Option<MetricsSet> {
135 Some(self.metrics_set.clone_inner())
136 }
137
138 fn gather_filters_for_pushdown(
139 &self,
140 _phase: FilterPushdownPhase,
141 parent_filters: Vec<Arc<dyn PhysicalExpr>>,
142 _config: &ConfigOptions,
143 ) -> Result<FilterDescription> {
144 FilterDescription::from_children(parent_filters, &self.children())
145 }
146}