datafusion_federation/schema_cast/
mod.rs

1use async_stream::stream;
2use datafusion::arrow::datatypes::SchemaRef;
3use datafusion::common::Statistics;
4use datafusion::error::{DataFusionError, Result};
5use datafusion::execution::{SendableRecordBatchStream, TaskContext};
6use datafusion::physical_plan::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
7use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
8use datafusion::physical_plan::{
9    DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
10};
11use futures::StreamExt;
12use std::any::Any;
13use std::clone::Clone;
14use std::fmt;
15use std::sync::Arc;
16
17mod intervals_cast;
18mod lists_cast;
19pub mod record_convert;
20mod struct_cast;
21
22#[derive(Debug)]
23#[allow(clippy::module_name_repetitions)]
24pub struct SchemaCastScanExec {
25    input: Arc<dyn ExecutionPlan>,
26    schema: SchemaRef,
27    properties: PlanProperties,
28    metrics_set: ExecutionPlanMetricsSet,
29}
30
31impl SchemaCastScanExec {
32    pub fn new(input: Arc<dyn ExecutionPlan>, schema: SchemaRef) -> Self {
33        let eq_properties = input.equivalence_properties().clone();
34        let emission_type = input.pipeline_behavior();
35        let boundedness = input.boundedness();
36        let properties = PlanProperties::new(
37            eq_properties,
38            input.output_partitioning().clone(),
39            emission_type,
40            boundedness,
41        );
42        Self {
43            input,
44            schema,
45            properties,
46            metrics_set: ExecutionPlanMetricsSet::new(),
47        }
48    }
49}
50
51impl DisplayAs for SchemaCastScanExec {
52    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
53        write!(f, "SchemaCastScanExec")
54    }
55}
56
57impl ExecutionPlan for SchemaCastScanExec {
58    fn name(&self) -> &str {
59        "SchemaCastScanExec"
60    }
61
62    fn as_any(&self) -> &dyn Any {
63        self
64    }
65
66    fn properties(&self) -> &PlanProperties {
67        &self.properties
68    }
69
70    fn schema(&self) -> SchemaRef {
71        Arc::clone(&self.schema)
72    }
73
74    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
75        vec![&self.input]
76    }
77
78    /// Prevents the introduction of additional `RepartitionExec` and processing input in parallel.
79    /// This guarantees that the input is processed as a single stream, preserving the order of the data.
80    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
81        vec![false]
82    }
83
84    fn with_new_children(
85        self: Arc<Self>,
86        children: Vec<Arc<dyn ExecutionPlan>>,
87    ) -> Result<Arc<dyn ExecutionPlan>> {
88        if children.len() == 1 {
89            Ok(Arc::new(Self::new(
90                Arc::clone(&children[0]),
91                Arc::clone(&self.schema),
92            )))
93        } else {
94            Err(DataFusionError::Execution(
95                "SchemaCastScanExec expects exactly one input".to_string(),
96            ))
97        }
98    }
99
100    fn execute(
101        &self,
102        partition: usize,
103        context: Arc<TaskContext>,
104    ) -> Result<SendableRecordBatchStream> {
105        let mut stream = self.input.execute(partition, context)?;
106        let schema = Arc::clone(&self.schema);
107        let baseline_metrics = BaselineMetrics::new(&self.metrics_set, partition);
108
109        Ok(Box::pin(RecordBatchStreamAdapter::new(
110            Arc::clone(&schema),
111            {
112                stream! {
113                    while let Some(batch) = stream.next().await {
114                        let _timer = baseline_metrics.elapsed_compute().timer();
115                        let batch = record_convert::try_cast_to(batch?, Arc::clone(&schema));
116                        let batch = batch.map_err(|e| { DataFusionError::External(Box::new(e)) });
117                        if let Ok(ref b) = batch {
118                            baseline_metrics.output_rows().add(b.num_rows());
119                        }
120                        yield batch;
121                    }
122                }
123            },
124        )))
125    }
126
127    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
128        self.input.partition_statistics(partition)
129    }
130
131    fn metrics(&self) -> Option<MetricsSet> {
132        Some(self.metrics_set.clone_inner())
133    }
134}