datafusion_remote_table/
exec.rs

1use crate::{Connection, ConnectionOptions, DFResult, RemoteSchemaRef, Transform, TransformStream};
2use datafusion::arrow::datatypes::SchemaRef;
3use datafusion::execution::{SendableRecordBatchStream, TaskContext};
4use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
5use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
6use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
7use datafusion::physical_plan::{
8    project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
9};
10use futures::TryStreamExt;
11use std::any::Any;
12use std::sync::Arc;
13
14#[derive(Debug)]
15pub struct RemoteTableExec {
16    pub(crate) conn_options: ConnectionOptions,
17    pub(crate) sql: String,
18    pub(crate) table_schema: SchemaRef,
19    pub(crate) remote_schema: Option<RemoteSchemaRef>,
20    pub(crate) projection: Option<Vec<usize>>,
21    pub(crate) transform: Option<Arc<dyn Transform>>,
22    conn: Arc<dyn Connection>,
23    plan_properties: PlanProperties,
24}
25
26impl RemoteTableExec {
27    pub fn try_new(
28        conn_options: ConnectionOptions,
29        sql: String,
30        table_schema: SchemaRef,
31        remote_schema: Option<RemoteSchemaRef>,
32        projection: Option<Vec<usize>>,
33        transform: Option<Arc<dyn Transform>>,
34        conn: Arc<dyn Connection>,
35    ) -> DFResult<Self> {
36        let projected_schema = project_schema(&table_schema, projection.as_ref())?;
37        let plan_properties = PlanProperties::new(
38            EquivalenceProperties::new(projected_schema),
39            Partitioning::UnknownPartitioning(1),
40            EmissionType::Incremental,
41            Boundedness::Bounded,
42        );
43        Ok(Self {
44            conn_options,
45            sql,
46            table_schema,
47            remote_schema,
48            projection,
49            transform,
50            conn,
51            plan_properties,
52        })
53    }
54}
55
56impl ExecutionPlan for RemoteTableExec {
57    fn name(&self) -> &str {
58        "RemoteTableExec"
59    }
60
61    fn as_any(&self) -> &dyn Any {
62        self
63    }
64
65    fn properties(&self) -> &PlanProperties {
66        &self.plan_properties
67    }
68
69    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
70        vec![]
71    }
72
73    fn with_new_children(
74        self: Arc<Self>,
75        _children: Vec<Arc<dyn ExecutionPlan>>,
76    ) -> DFResult<Arc<dyn ExecutionPlan>> {
77        Ok(self)
78    }
79
80    fn execute(
81        &self,
82        partition: usize,
83        _context: Arc<TaskContext>,
84    ) -> DFResult<SendableRecordBatchStream> {
85        assert_eq!(partition, 0);
86        let schema = self.schema();
87        let fut = build_and_transform_stream(
88            self.conn.clone(),
89            self.conn_options.clone(),
90            self.sql.clone(),
91            self.table_schema.clone(),
92            self.remote_schema.clone(),
93            self.projection.clone(),
94            self.transform.clone(),
95        );
96        let stream = futures::stream::once(fut).try_flatten();
97        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
98    }
99}
100
101async fn build_and_transform_stream(
102    conn: Arc<dyn Connection>,
103    conn_options: ConnectionOptions,
104    sql: String,
105    table_schema: SchemaRef,
106    remote_schema: Option<RemoteSchemaRef>,
107    projection: Option<Vec<usize>>,
108    transform: Option<Arc<dyn Transform>>,
109) -> DFResult<SendableRecordBatchStream> {
110    let stream = conn
111        .query(
112            &conn_options,
113            &sql,
114            table_schema.clone(),
115            projection.as_ref(),
116        )
117        .await?;
118    if let Some(transform) = transform.as_ref() {
119        Ok(Box::pin(TransformStream::try_new(
120            stream,
121            transform.clone(),
122            table_schema,
123            projection,
124            remote_schema,
125        )?))
126    } else {
127        Ok(stream)
128    }
129}
130
131impl DisplayAs for RemoteTableExec {
132    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
133        write!(f, "RemoteTableExec")
134    }
135}