datafusion_remote_table/
exec.rs

1use crate::transform::transform_batch;
2use crate::{Connection, ConnectionOptions, DFResult, RemoteSchema, Transform};
3use datafusion::arrow::array::RecordBatch;
4use datafusion::arrow::datatypes::SchemaRef;
5use datafusion::execution::{RecordBatchStream, SendableRecordBatchStream, TaskContext};
6use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
7use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
8use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
9use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties};
10use futures::{Stream, StreamExt, TryStreamExt};
11use std::any::Any;
12use std::pin::Pin;
13use std::sync::Arc;
14use std::task::{Context, Poll};
15
16#[derive(Debug)]
17pub struct RemoteTableExec {
18    pub(crate) conn_options: ConnectionOptions,
19    pub(crate) sql: String,
20    pub(crate) projection: Option<Vec<usize>>,
21    pub(crate) transform: Option<Arc<dyn Transform>>,
22    conn: Arc<dyn Connection>,
23    plan_properties: PlanProperties,
24}
25
26impl RemoteTableExec {
27    pub fn new(
28        conn_options: ConnectionOptions,
29        projected_schema: SchemaRef,
30        sql: String,
31        projection: Option<Vec<usize>>,
32        transform: Option<Arc<dyn Transform>>,
33        conn: Arc<dyn Connection>,
34    ) -> Self {
35        let plan_properties = PlanProperties::new(
36            EquivalenceProperties::new(projected_schema),
37            Partitioning::UnknownPartitioning(1),
38            EmissionType::Incremental,
39            Boundedness::Bounded,
40        );
41        Self {
42            conn_options,
43            sql,
44            projection,
45            transform,
46            conn,
47            plan_properties,
48        }
49    }
50}
51
52impl ExecutionPlan for RemoteTableExec {
53    fn name(&self) -> &str {
54        "RemoteTableExec"
55    }
56
57    fn as_any(&self) -> &dyn Any {
58        self
59    }
60
61    fn properties(&self) -> &PlanProperties {
62        &self.plan_properties
63    }
64
65    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
66        vec![]
67    }
68
69    fn with_new_children(
70        self: Arc<Self>,
71        _children: Vec<Arc<dyn ExecutionPlan>>,
72    ) -> DFResult<Arc<dyn ExecutionPlan>> {
73        Ok(self)
74    }
75
76    fn execute(
77        &self,
78        partition: usize,
79        _context: Arc<TaskContext>,
80    ) -> DFResult<SendableRecordBatchStream> {
81        assert_eq!(partition, 0);
82        let schema = self.schema();
83        let fut = build_and_transform_stream(
84            self.conn.clone(),
85            self.sql.clone(),
86            self.projection.clone(),
87            self.transform.clone(),
88            schema.clone(),
89        );
90        let stream = futures::stream::once(fut).try_flatten();
91        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
92    }
93}
94
95async fn build_and_transform_stream(
96    conn: Arc<dyn Connection>,
97    sql: String,
98    projection: Option<Vec<usize>>,
99    transform: Option<Arc<dyn Transform>>,
100    schema: SchemaRef,
101) -> DFResult<SendableRecordBatchStream> {
102    let (stream, remote_schema) = conn.query(sql, projection).await?;
103    assert_eq!(schema.fields().len(), remote_schema.fields.len());
104    if let Some(transform) = transform.as_ref() {
105        Ok(Box::pin(TransformStream {
106            input: stream,
107            transform: transform.clone(),
108            schema,
109            remote_schema,
110        }))
111    } else {
112        Ok(stream)
113    }
114}
115
116pub(crate) struct TransformStream {
117    input: SendableRecordBatchStream,
118    transform: Arc<dyn Transform>,
119    schema: SchemaRef,
120    remote_schema: RemoteSchema,
121}
122
123impl Stream for TransformStream {
124    type Item = DFResult<RecordBatch>;
125    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
126        match self.input.poll_next_unpin(cx) {
127            Poll::Ready(Some(Ok(batch))) => {
128                match transform_batch(batch, self.transform.as_ref(), &self.remote_schema) {
129                    Ok(result) => Poll::Ready(Some(Ok(result))),
130                    Err(e) => Poll::Ready(Some(Err(e))),
131                }
132            }
133            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
134            Poll::Ready(None) => Poll::Ready(None),
135            Poll::Pending => Poll::Pending,
136        }
137    }
138}
139
140impl RecordBatchStream for TransformStream {
141    fn schema(&self) -> SchemaRef {
142        self.schema.clone()
143    }
144}
145
146impl DisplayAs for RemoteTableExec {
147    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
148        write!(f, "RemoteTableExec")
149    }
150}