datafusion_remote_table/
exec.rs1use crate::transform::transform_batch;
2use crate::{Connection, ConnectionOptions, DFResult, RemoteSchema, Transform};
3use datafusion::arrow::array::RecordBatch;
4use datafusion::arrow::datatypes::SchemaRef;
5use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
6use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
7use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
8use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
9use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
10use futures::{Stream, StreamExt, TryStreamExt};
11use std::any::Any;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::task::{Context, Poll};
15
16#[derive(Debug)]
17pub struct RemoteTableExec {
18 pub(crate) conn_options: ConnectionOptions,
19 pub(crate) sql: String,
20 pub(crate) projection: Option<Vec<usize>>,
21 pub(crate) transform: Option<Arc<dyn Transform>>,
22 conn: Arc<dyn Connection>,
23 plan_properties: PlanProperties,
24}
25
26impl RemoteTableExec {
27 pub fn new(
28 conn_options: ConnectionOptions,
29 projected_schema: SchemaRef,
30 sql: String,
31 projection: Option<Vec<usize>>,
32 transform: Option<Arc<dyn Transform>>,
33 conn: Arc<dyn Connection>,
34 ) -> Self {
35 let plan_properties = PlanProperties::new(
36 EquivalenceProperties::new(projected_schema),
37 Partitioning::UnknownPartitioning(1),
38 EmissionType::Incremental,
39 Boundedness::Bounded,
40 );
41 Self {
42 conn_options,
43 sql,
44 projection,
45 transform,
46 conn,
47 plan_properties,
48 }
49 }
50}
51
52impl ExecutionPlan for RemoteTableExec {
53 fn name(&self) -> &str {
54 "RemoteTableExec"
55 }
56
57 fn as_any(&self) -> &dyn Any {
58 self
59 }
60
61 fn properties(&self) -> &PlanProperties {
62 &self.plan_properties
63 }
64
65 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
66 vec![]
67 }
68
69 fn with_new_children(
70 self: Arc<Self>,
71 _children: Vec<Arc<dyn ExecutionPlan>>,
72 ) -> DFResult<Arc<dyn ExecutionPlan>> {
73 Ok(self)
74 }
75
76 fn execute(
77 &self,
78 partition: usize,
79 _context: Arc<TaskContext>,
80 ) -> DFResult<SendableRecordBatchStream> {
81 assert_eq!(partition, 0);
82 let schema = self.schema();
83 let fut = build_and_transform_stream(
84 self.conn.clone(),
85 self.sql.clone(),
86 self.projection.clone(),
87 self.transform.clone(),
88 schema.clone(),
89 );
90 let stream = futures::stream::once(fut).try_flatten();
91 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
92 }
93}
94
95async fn build_and_transform_stream(
96 conn: Arc<dyn Connection>,
97 sql: String,
98 projection: Option<Vec<usize>>,
99 transform: Option<Arc<dyn Transform>>,
100 schema: SchemaRef,
101) -> DFResult<SendableRecordBatchStream> {
102 let (stream, remote_schema) = conn.query(sql, projection).await?;
103 assert_eq!(schema.fields().len(), remote_schema.fields.len());
104 if let Some(transform) = transform.as_ref() {
105 Ok(Box::pin(TransformStream {
106 input: stream,
107 transform: transform.clone(),
108 schema,
109 remote_schema,
110 }))
111 } else {
112 Ok(stream)
113 }
114}
115
116pub(crate) struct TransformStream {
117 input: SendableRecordBatchStream,
118 transform: Arc<dyn Transform>,
119 schema: SchemaRef,
120 remote_schema: RemoteSchema,
121}
122
123impl Stream for TransformStream {
124 type Item = DFResult<RecordBatch>;
125 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
126 match self.input.poll_next_unpin(cx) {
127 Poll::Ready(Some(Ok(batch))) => {
128 match transform_batch(batch, self.transform.as_ref(), &self.remote_schema) {
129 Ok(result) => Poll::Ready(Some(Ok(result))),
130 Err(e) => Poll::Ready(Some(Err(e))),
131 }
132 }
133 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
134 Poll::Ready(None) => Poll::Ready(None),
135 Poll::Pending => Poll::Pending,
136 }
137 }
138}
139
140impl RecordBatchStream for TransformStream {
141 fn schema(&self) -> SchemaRef {
142 self.schema.clone()
143 }
144}
145
146impl DisplayAs for RemoteTableExec {
147 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
148 write!(f, "RemoteTableExec")
149 }
150}