datafusion_federation/schema_cast/
mod.rs

1use async_stream::stream;
2use datafusion::arrow::datatypes::SchemaRef;
3use datafusion::error::{DataFusionError, Result};
4use datafusion::execution::{SendableRecordBatchStream, TaskContext};
5use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
6use datafusion::physical_plan::{
7    DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
8};
9use futures::StreamExt;
10use std::any::Any;
11use std::clone::Clone;
12use std::fmt;
13use std::sync::Arc;
14
15mod intervals_cast;
16mod lists_cast;
17pub mod record_convert;
18mod struct_cast;
19
20#[derive(Debug)]
21#[allow(clippy::module_name_repetitions)]
22pub struct SchemaCastScanExec {
23    input: Arc<dyn ExecutionPlan>,
24    schema: SchemaRef,
25    properties: PlanProperties,
26}
27
28impl SchemaCastScanExec {
29    pub fn new(input: Arc<dyn ExecutionPlan>, schema: SchemaRef) -> Self {
30        let eq_properties = input.equivalence_properties().clone();
31        let emission_type = input.pipeline_behavior();
32        let boundedness = input.boundedness();
33        let properties = PlanProperties::new(
34            eq_properties,
35            input.output_partitioning().clone(),
36            emission_type,
37            boundedness,
38        );
39        Self {
40            input,
41            schema,
42            properties,
43        }
44    }
45}
46
47impl DisplayAs for SchemaCastScanExec {
48    fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
49        write!(f, "SchemaCastScanExec")
50    }
51}
52
53impl ExecutionPlan for SchemaCastScanExec {
54    fn name(&self) -> &str {
55        "SchemaCastScanExec"
56    }
57
58    fn as_any(&self) -> &dyn Any {
59        self
60    }
61
62    fn properties(&self) -> &PlanProperties {
63        &self.properties
64    }
65
66    fn schema(&self) -> SchemaRef {
67        Arc::clone(&self.schema)
68    }
69
70    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
71        vec![&self.input]
72    }
73
74    /// Prevents the introduction of additional `RepartitionExec` and processing input in parallel.
75    /// This guarantees that the input is processed as a single stream, preserving the order of the data.
76    fn benefits_from_input_partitioning(&self) -> Vec<bool> {
77        vec![false]
78    }
79
80    fn with_new_children(
81        self: Arc<Self>,
82        children: Vec<Arc<dyn ExecutionPlan>>,
83    ) -> Result<Arc<dyn ExecutionPlan>> {
84        if children.len() == 1 {
85            Ok(Arc::new(Self::new(
86                Arc::clone(&children[0]),
87                Arc::clone(&self.schema),
88            )))
89        } else {
90            Err(DataFusionError::Execution(
91                "SchemaCastScanExec expects exactly one input".to_string(),
92            ))
93        }
94    }
95
96    fn execute(
97        &self,
98        partition: usize,
99        context: Arc<TaskContext>,
100    ) -> Result<SendableRecordBatchStream> {
101        let mut stream = self.input.execute(partition, context)?;
102        let schema = Arc::clone(&self.schema);
103
104        Ok(Box::pin(RecordBatchStreamAdapter::new(
105            Arc::clone(&schema),
106            {
107                stream! {
108                    while let Some(batch) = stream.next().await {
109                        let batch = record_convert::try_cast_to(batch?, Arc::clone(&schema));
110                        yield batch.map_err(|e| { DataFusionError::External(Box::new(e)) });
111                    }
112                }
113            },
114        )))
115    }
116}