datafusion_federation/schema_cast/
mod.rs1use async_stream::stream;
2use datafusion::arrow::datatypes::SchemaRef;
3use datafusion::common::Statistics;
4use datafusion::error::{DataFusionError, Result};
5use datafusion::execution::{SendableRecordBatchStream, TaskContext};
6use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
7use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
8use datafusion::physical_plan::{
9 DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
10};
11use futures::StreamExt;
12use std::any::Any;
13use std::clone::Clone;
14use std::fmt;
15use std::sync::Arc;
16
17mod intervals_cast;
18mod lists_cast;
19pub mod record_convert;
20mod struct_cast;
21
22#[derive(Debug)]
23#[allow(clippy::module_name_repetitions)]
24pub struct SchemaCastScanExec {
25 input: Arc<dyn ExecutionPlan>,
26 schema: SchemaRef,
27 properties: PlanProperties,
28 metrics_set: ExecutionPlanMetricsSet,
29}
30
31impl SchemaCastScanExec {
32 pub fn new(input: Arc<dyn ExecutionPlan>, schema: SchemaRef) -> Self {
33 let eq_properties = input.equivalence_properties().clone();
34 let emission_type = input.pipeline_behavior();
35 let boundedness = input.boundedness();
36 let properties = PlanProperties::new(
37 eq_properties,
38 input.output_partitioning().clone(),
39 emission_type,
40 boundedness,
41 );
42 Self {
43 input,
44 schema,
45 properties,
46 metrics_set: ExecutionPlanMetricsSet::new(),
47 }
48 }
49}
50
51impl DisplayAs for SchemaCastScanExec {
52 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
53 write!(f, "SchemaCastScanExec")
54 }
55}
56
57impl ExecutionPlan for SchemaCastScanExec {
58 fn name(&self) -> &str {
59 "SchemaCastScanExec"
60 }
61
62 fn as_any(&self) -> &dyn Any {
63 self
64 }
65
66 fn properties(&self) -> &PlanProperties {
67 &self.properties
68 }
69
70 fn schema(&self) -> SchemaRef {
71 Arc::clone(&self.schema)
72 }
73
74 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
75 vec![&self.input]
76 }
77
78 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
81 vec![false]
82 }
83
84 fn with_new_children(
85 self: Arc<Self>,
86 children: Vec<Arc<dyn ExecutionPlan>>,
87 ) -> Result<Arc<dyn ExecutionPlan>> {
88 if children.len() == 1 {
89 Ok(Arc::new(Self::new(
90 Arc::clone(&children[0]),
91 Arc::clone(&self.schema),
92 )))
93 } else {
94 Err(DataFusionError::Execution(
95 "SchemaCastScanExec expects exactly one input".to_string(),
96 ))
97 }
98 }
99
100 fn execute(
101 &self,
102 partition: usize,
103 context: Arc<TaskContext>,
104 ) -> Result<SendableRecordBatchStream> {
105 let mut stream = self.input.execute(partition, context)?;
106 let schema = Arc::clone(&self.schema);
107 let baseline_metrics = BaselineMetrics::new(&self.metrics_set, partition);
108
109 Ok(Box::pin(RecordBatchStreamAdapter::new(
110 Arc::clone(&schema),
111 {
112 stream! {
113 while let Some(batch) = stream.next().await {
114 let _timer = baseline_metrics.elapsed_compute().timer();
115 let batch = record_convert::try_cast_to(batch?, Arc::clone(&schema));
116 let batch = batch.map_err(|e| { DataFusionError::External(Box::new(e)) });
117 if let Ok(ref b) = batch {
118 baseline_metrics.output_rows().add(b.num_rows());
119 }
120 yield batch;
121 }
122 }
123 },
124 )))
125 }
126
127 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
128 self.input.partition_statistics(partition)
129 }
130
131 fn metrics(&self) -> Option<MetricsSet> {
132 Some(self.metrics_set.clone_inner())
133 }
134}