datafusion_federation/schema_cast/
mod.rs

1use async_stream::stream;
2use datafusion::arrow::datatypes::SchemaRef;
3use datafusion::common::Statistics;
4use datafusion::error::{DataFusionError, Result};
5use datafusion::execution::{SendableRecordBatchStream, TaskContext};
6use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
7use datafusion::physical_plan::{
8    DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
9};
10use futures::StreamExt;
11use std::any::Any;
12use std::clone::Clone;
13use std::fmt;
14use std::sync::Arc;
15
16mod intervals_cast;
17mod lists_cast;
18pub mod record_convert;
19mod struct_cast;
20
21#[derive(Debug)]
22#[allow(clippy::module_name_repetitions)]
23pub struct SchemaCastScanExec {
24    input: Arc<dyn ExecutionPlan>,
25    schema: SchemaRef,
26    properties: PlanProperties,
27}
28
29impl SchemaCastScanExec {
30    pub fn new(input: Arc<dyn ExecutionPlan>, schema: SchemaRef) -> Self {
31        let eq_properties = input.equivalence_properties().clone();
32        let emission_type = input.pipeline_behavior();
33        let boundedness = input.boundedness();
34        let properties = PlanProperties::new(
35            eq_properties,
36            input.output_partitioning().clone(),
37            emission_type,
38            boundedness,
39        );
40        Self {
41            input,
42            schema,
43            properties,
44        }
45    }
46}
47
48impl DisplayAs for SchemaCastScanExec {
49    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
50        write!(f, "SchemaCastScanExec")
51    }
52}
53
54impl ExecutionPlan for SchemaCastScanExec {
55    fn name(&self) -> &str {
56        "SchemaCastScanExec"
57    }
58
59    fn as_any(&self) -> &dyn Any {
60        self
61    }
62
63    fn properties(&self) -> &PlanProperties {
64        &self.properties
65    }
66
67    fn schema(&self) -> SchemaRef {
68        Arc::clone(&self.schema)
69    }
70
71    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
72        vec![&self.input]
73    }
74
75    /// Prevents the introduction of additional `RepartitionExec` and processing input in parallel.
76    /// This guarantees that the input is processed as a single stream, preserving the order of the data.
77    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
78        vec![false]
79    }
80
81    fn with_new_children(
82        self: Arc<Self>,
83        children: Vec<Arc<dyn ExecutionPlan>>,
84    ) -> Result<Arc<dyn ExecutionPlan>> {
85        if children.len() == 1 {
86            Ok(Arc::new(Self::new(
87                Arc::clone(&children[0]),
88                Arc::clone(&self.schema),
89            )))
90        } else {
91            Err(DataFusionError::Execution(
92                "SchemaCastScanExec expects exactly one input".to_string(),
93            ))
94        }
95    }
96
97    fn execute(
98        &self,
99        partition: usize,
100        context: Arc<TaskContext>,
101    ) -> Result<SendableRecordBatchStream> {
102        let mut stream = self.input.execute(partition, context)?;
103        let schema = Arc::clone(&self.schema);
104
105        Ok(Box::pin(RecordBatchStreamAdapter::new(
106            Arc::clone(&schema),
107            {
108                stream! {
109                    while let Some(batch) = stream.next().await {
110                        let batch = record_convert::try_cast_to(batch?, Arc::clone(&schema));
111                        yield batch.map_err(|e| { DataFusionError::External(Box::new(e)) });
112                    }
113                }
114            },
115        )))
116    }
117
118    fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
119        self.input.partition_statistics(partition)
120    }
121}