datafusion_remote_table/
exec.rs

1use crate::transform::transform_batch;
2use crate::{connect, ConnectionOptions, DFResult, RemoteSchema, Transform};
3use datafusion::arrow::array::RecordBatch;
4use datafusion::arrow::datatypes::SchemaRef;
5use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
6use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
7use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
8use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
9use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
10use futures::{Stream, StreamExt, TryStreamExt};
11use std::any::Any;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::task::{Context, Poll};
15
16#[derive(Debug)]
17pub struct RemoteTableExec {
18    conn_options: ConnectionOptions,
19    sql: String,
20    projection: Option<Vec<usize>>,
21    pub(crate) transform: Option<Arc<dyn Transform>>,
22    plan_properties: PlanProperties,
23}
24
25impl RemoteTableExec {
26    pub async fn try_new(
27        conn_options: ConnectionOptions,
28        projected_schema: SchemaRef,
29        sql: String,
30        projection: Option<Vec<usize>>,
31        transform: Option<Arc<dyn Transform>>,
32    ) -> DFResult<Self> {
33        let plan_properties = PlanProperties::new(
34            EquivalenceProperties::new(projected_schema),
35            Partitioning::UnknownPartitioning(1),
36            EmissionType::Incremental,
37            Boundedness::Bounded,
38        );
39        Ok(Self {
40            conn_options,
41            sql,
42            projection,
43            transform,
44            plan_properties,
45        })
46    }
47}
48
49impl ExecutionPlan for RemoteTableExec {
50    fn name(&self) -> &str {
51        "RemoteTableExec"
52    }
53
54    fn as_any(&self) -> &dyn Any {
55        self
56    }
57
58    fn properties(&self) -> &PlanProperties {
59        &self.plan_properties
60    }
61
62    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
63        vec![]
64    }
65
66    fn with_new_children(
67        self: Arc<Self>,
68        _children: Vec<Arc<dyn ExecutionPlan>>,
69    ) -> DFResult<Arc<dyn ExecutionPlan>> {
70        Ok(self)
71    }
72
73    fn execute(
74        &self,
75        partition: usize,
76        _context: Arc<TaskContext>,
77    ) -> DFResult<SendableRecordBatchStream> {
78        assert_eq!(partition, 0);
79        let schema = self.schema();
80        let fut = build_and_transform_stream(
81            self.conn_options.clone(),
82            self.sql.clone(),
83            self.projection.clone(),
84            self.transform.clone(),
85            schema.clone(),
86        );
87        let stream = futures::stream::once(fut).try_flatten();
88        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
89    }
90}
91
92async fn build_and_transform_stream(
93    conn_options: ConnectionOptions,
94    sql: String,
95    projection: Option<Vec<usize>>,
96    transform: Option<Arc<dyn Transform>>,
97    schema: SchemaRef,
98) -> DFResult<SendableRecordBatchStream> {
99    let conn = connect(&conn_options).await?;
100    let (stream, remote_schema) = conn.query(sql, projection).await?;
101    assert_eq!(schema.fields().len(), remote_schema.fields.len());
102    if let Some(transform) = transform.as_ref() {
103        Ok(Box::pin(TransformStream {
104            input: stream,
105            transform: transform.clone(),
106            schema,
107            remote_schema,
108        }))
109    } else {
110        Ok(stream)
111    }
112}
113
114pub(crate) struct TransformStream {
115    input: SendableRecordBatchStream,
116    transform: Arc<dyn Transform>,
117    schema: SchemaRef,
118    remote_schema: RemoteSchema,
119}
120
121impl Stream for TransformStream {
122    type Item = DFResult<RecordBatch>;
123    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
124        match self.input.poll_next_unpin(cx) {
125            Poll::Ready(Some(Ok(batch))) => {
126                match transform_batch(batch, self.transform.as_ref(), &self.remote_schema) {
127                    Ok(result) => Poll::Ready(Some(Ok(result))),
128                    Err(e) => Poll::Ready(Some(Err(e))),
129                }
130            }
131            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
132            Poll::Ready(None) => Poll::Ready(None),
133            Poll::Pending => Poll::Pending,
134        }
135    }
136}
137
138impl RecordBatchStream for TransformStream {
139    fn schema(&self) -> SchemaRef {
140        self.schema.clone()
141    }
142}
143
144impl DisplayAs for RemoteTableExec {
145    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
146        write!(f, "RemoteTableExec")
147    }
148}