datafusion_remote_table/
exec.rs

1use crate::{
2    Connection, ConnectionOptions, DFResult, RemoteSchemaRef, Transform, TransformStream,
3    transform_schema,
4};
5use datafusion::arrow::datatypes::SchemaRef;
6use datafusion::execution::{SendableRecordBatchStream, TaskContext};
7use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
8use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
9use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
10use datafusion::physical_plan::{
11    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, project_schema,
12};
13use futures::TryStreamExt;
14use std::any::Any;
15use std::sync::Arc;
16
17#[derive(Debug)]
18pub struct RemoteTableExec {
19    pub(crate) conn_options: ConnectionOptions,
20    pub(crate) sql: String,
21    pub(crate) table_schema: SchemaRef,
22    pub(crate) remote_schema: Option<RemoteSchemaRef>,
23    pub(crate) projection: Option<Vec<usize>>,
24    pub(crate) transform: Option<Arc<dyn Transform>>,
25    conn: Arc<dyn Connection>,
26    plan_properties: PlanProperties,
27}
28
29impl RemoteTableExec {
30    pub fn try_new(
31        conn_options: ConnectionOptions,
32        sql: String,
33        table_schema: SchemaRef,
34        remote_schema: Option<RemoteSchemaRef>,
35        projection: Option<Vec<usize>>,
36        transform: Option<Arc<dyn Transform>>,
37        conn: Arc<dyn Connection>,
38    ) -> DFResult<Self> {
39        let transformed_table_schema = if let Some(transform) = transform.as_ref() {
40            transform_schema(
41                table_schema.clone(),
42                transform.as_ref(),
43                remote_schema.as_ref(),
44            )?
45        } else {
46            table_schema.clone()
47        };
48        let projected_schema = project_schema(&transformed_table_schema, projection.as_ref())?;
49        let plan_properties = PlanProperties::new(
50            EquivalenceProperties::new(projected_schema),
51            Partitioning::UnknownPartitioning(1),
52            EmissionType::Incremental,
53            Boundedness::Bounded,
54        );
55        Ok(Self {
56            conn_options,
57            sql,
58            table_schema,
59            remote_schema,
60            projection,
61            transform,
62            conn,
63            plan_properties,
64        })
65    }
66}
67
68impl ExecutionPlan for RemoteTableExec {
69    fn name(&self) -> &str {
70        "RemoteTableExec"
71    }
72
73    fn as_any(&self) -> &dyn Any {
74        self
75    }
76
77    fn properties(&self) -> &PlanProperties {
78        &self.plan_properties
79    }
80
81    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
82        vec![]
83    }
84
85    fn with_new_children(
86        self: Arc<Self>,
87        _children: Vec<Arc<dyn ExecutionPlan>>,
88    ) -> DFResult<Arc<dyn ExecutionPlan>> {
89        Ok(self)
90    }
91
92    fn execute(
93        &self,
94        partition: usize,
95        _context: Arc<TaskContext>,
96    ) -> DFResult<SendableRecordBatchStream> {
97        assert_eq!(partition, 0);
98        let schema = self.schema();
99        let fut = build_and_transform_stream(
100            self.conn.clone(),
101            self.conn_options.clone(),
102            self.sql.clone(),
103            self.table_schema.clone(),
104            self.remote_schema.clone(),
105            self.projection.clone(),
106            self.transform.clone(),
107        );
108        let stream = futures::stream::once(fut).try_flatten();
109        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
110    }
111}
112
113async fn build_and_transform_stream(
114    conn: Arc<dyn Connection>,
115    conn_options: ConnectionOptions,
116    sql: String,
117    table_schema: SchemaRef,
118    remote_schema: Option<RemoteSchemaRef>,
119    projection: Option<Vec<usize>>,
120    transform: Option<Arc<dyn Transform>>,
121) -> DFResult<SendableRecordBatchStream> {
122    let stream = conn
123        .query(
124            &conn_options,
125            &sql,
126            table_schema.clone(),
127            projection.as_ref(),
128        )
129        .await?;
130    if let Some(transform) = transform.as_ref() {
131        Ok(Box::pin(TransformStream::try_new(
132            stream,
133            transform.clone(),
134            table_schema,
135            projection,
136            remote_schema,
137        )?))
138    } else {
139        Ok(stream)
140    }
141}
142
143impl DisplayAs for RemoteTableExec {
144    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
145        write!(f, "RemoteTableExec")
146    }
147}