datafusion_federation/schema_cast/
mod.rs1use async_stream::stream;
2use datafusion::arrow::datatypes::SchemaRef;
3use datafusion::common::Statistics;
4use datafusion::error::{DataFusionError, Result};
5use datafusion::execution::{SendableRecordBatchStream, TaskContext};
6use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
7use datafusion::physical_plan::{
8 DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
9};
10use futures::StreamExt;
11use std::any::Any;
12use std::clone::Clone;
13use std::fmt;
14use std::sync::Arc;
15
16mod intervals_cast;
17mod lists_cast;
18pub mod record_convert;
19mod struct_cast;
20
21#[derive(Debug)]
22#[allow(clippy::module_name_repetitions)]
23pub struct SchemaCastScanExec {
24 input: Arc<dyn ExecutionPlan>,
25 schema: SchemaRef,
26 properties: PlanProperties,
27}
28
29impl SchemaCastScanExec {
30 pub fn new(input: Arc<dyn ExecutionPlan>, schema: SchemaRef) -> Self {
31 let eq_properties = input.equivalence_properties().clone();
32 let emission_type = input.pipeline_behavior();
33 let boundedness = input.boundedness();
34 let properties = PlanProperties::new(
35 eq_properties,
36 input.output_partitioning().clone(),
37 emission_type,
38 boundedness,
39 );
40 Self {
41 input,
42 schema,
43 properties,
44 }
45 }
46}
47
48impl DisplayAs for SchemaCastScanExec {
49 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
50 write!(f, "SchemaCastScanExec")
51 }
52}
53
54impl ExecutionPlan for SchemaCastScanExec {
55 fn name(&self) -> &str {
56 "SchemaCastScanExec"
57 }
58
59 fn as_any(&self) -> &dyn Any {
60 self
61 }
62
63 fn properties(&self) -> &PlanProperties {
64 &self.properties
65 }
66
67 fn schema(&self) -> SchemaRef {
68 Arc::clone(&self.schema)
69 }
70
71 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
72 vec![&self.input]
73 }
74
75 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
78 vec![false]
79 }
80
81 fn with_new_children(
82 self: Arc<Self>,
83 children: Vec<Arc<dyn ExecutionPlan>>,
84 ) -> Result<Arc<dyn ExecutionPlan>> {
85 if children.len() == 1 {
86 Ok(Arc::new(Self::new(
87 Arc::clone(&children[0]),
88 Arc::clone(&self.schema),
89 )))
90 } else {
91 Err(DataFusionError::Execution(
92 "SchemaCastScanExec expects exactly one input".to_string(),
93 ))
94 }
95 }
96
97 fn execute(
98 &self,
99 partition: usize,
100 context: Arc<TaskContext>,
101 ) -> Result<SendableRecordBatchStream> {
102 let mut stream = self.input.execute(partition, context)?;
103 let schema = Arc::clone(&self.schema);
104
105 Ok(Box::pin(RecordBatchStreamAdapter::new(
106 Arc::clone(&schema),
107 {
108 stream! {
109 while let Some(batch) = stream.next().await {
110 let batch = record_convert::try_cast_to(batch?, Arc::clone(&schema));
111 yield batch.map_err(|e| { DataFusionError::External(Box::new(e)) });
112 }
113 }
114 },
115 )))
116 }
117
118 fn partition_statistics(&self, partition: Option<usize>) -> Result<Statistics> {
119 self.input.partition_statistics(partition)
120 }
121}