datafusion_federation/schema_cast/
mod.rs1use async_stream::stream;
2use datafusion::arrow::datatypes::SchemaRef;
3use datafusion::error::{DataFusionError, Result};
4use datafusion::execution::{SendableRecordBatchStream, TaskContext};
5use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
6use datafusion::physical_plan::{
7 DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
8};
9use futures::StreamExt;
10use std::any::Any;
11use std::clone::Clone;
12use std::fmt;
13use std::sync::Arc;
14
15mod intervals_cast;
16mod lists_cast;
17pub mod record_convert;
18mod struct_cast;
19
20#[derive(Debug)]
21#[allow(clippy::module_name_repetitions)]
22pub struct SchemaCastScanExec {
23 input: Arc<dyn ExecutionPlan>,
24 schema: SchemaRef,
25 properties: PlanProperties,
26}
27
28impl SchemaCastScanExec {
29 pub fn new(input: Arc<dyn ExecutionPlan>, schema: SchemaRef) -> Self {
30 let eq_properties = input.equivalence_properties().clone();
31 let emission_type = input.pipeline_behavior();
32 let boundedness = input.boundedness();
33 let properties = PlanProperties::new(
34 eq_properties,
35 input.output_partitioning().clone(),
36 emission_type,
37 boundedness,
38 );
39 Self {
40 input,
41 schema,
42 properties,
43 }
44 }
45}
46
47impl DisplayAs for SchemaCastScanExec {
48 fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
49 write!(f, "SchemaCastScanExec")
50 }
51}
52
53impl ExecutionPlan for SchemaCastScanExec {
54 fn name(&self) -> &str {
55 "SchemaCastScanExec"
56 }
57
58 fn as_any(&self) -> &dyn Any {
59 self
60 }
61
62 fn properties(&self) -> &PlanProperties {
63 &self.properties
64 }
65
66 fn schema(&self) -> SchemaRef {
67 Arc::clone(&self.schema)
68 }
69
70 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
71 vec![&self.input]
72 }
73
74 fn benefits_from_input_partitioning(&self) -> Vec<bool> {
77 vec![false]
78 }
79
80 fn with_new_children(
81 self: Arc<Self>,
82 children: Vec<Arc<dyn ExecutionPlan>>,
83 ) -> Result<Arc<dyn ExecutionPlan>> {
84 if children.len() == 1 {
85 Ok(Arc::new(Self::new(
86 Arc::clone(&children[0]),
87 Arc::clone(&self.schema),
88 )))
89 } else {
90 Err(DataFusionError::Execution(
91 "SchemaCastScanExec expects exactly one input".to_string(),
92 ))
93 }
94 }
95
96 fn execute(
97 &self,
98 partition: usize,
99 context: Arc<TaskContext>,
100 ) -> Result<SendableRecordBatchStream> {
101 let mut stream = self.input.execute(partition, context)?;
102 let schema = Arc::clone(&self.schema);
103
104 Ok(Box::pin(RecordBatchStreamAdapter::new(
105 Arc::clone(&schema),
106 {
107 stream! {
108 while let Some(batch) = stream.next().await {
109 let batch = record_convert::try_cast_to(batch?, Arc::clone(&schema));
110 yield batch.map_err(|e| { DataFusionError::External(Box::new(e)) });
111 }
112 }
113 },
114 )))
115 }
116}