datafusion_remote_table/
exec.rs

1use crate::{Connection, ConnectionOptions, DFResult, RemoteSchemaRef, Transform, TransformStream};
2use datafusion::arrow::datatypes::SchemaRef;
3use datafusion::execution::{SendableRecordBatchStream, TaskContext};
4use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
5use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
6use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
7use datafusion::physical_plan::{
8    project_schema, DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
9};
10use futures::TryStreamExt;
11use std::any::Any;
12use std::sync::Arc;
13
14#[derive(Debug)]
15pub struct RemoteTableExec {
16    pub(crate) conn_options: ConnectionOptions,
17    pub(crate) sql: String,
18    pub(crate) table_schema: SchemaRef,
19    pub(crate) remote_schema: Option<RemoteSchemaRef>,
20    pub(crate) projection: Option<Vec<usize>>,
21    pub(crate) transform: Option<Arc<dyn Transform>>,
22    conn: Arc<dyn Connection>,
23    plan_properties: PlanProperties,
24}
25
26impl RemoteTableExec {
27    pub fn try_new(
28        conn_options: ConnectionOptions,
29        sql: String,
30        table_schema: SchemaRef,
31        remote_schema: Option<RemoteSchemaRef>,
32        projection: Option<Vec<usize>>,
33        transform: Option<Arc<dyn Transform>>,
34        conn: Arc<dyn Connection>,
35    ) -> DFResult<Self> {
36        let projected_schema = project_schema(&table_schema, projection.as_ref())?;
37        let plan_properties = PlanProperties::new(
38            EquivalenceProperties::new(projected_schema),
39            Partitioning::UnknownPartitioning(1),
40            EmissionType::Incremental,
41            Boundedness::Bounded,
42        );
43        Ok(Self {
44            conn_options,
45            sql,
46            table_schema,
47            remote_schema,
48            projection,
49            transform,
50            conn,
51            plan_properties,
52        })
53    }
54}
55
56impl ExecutionPlan for RemoteTableExec {
57    fn name(&self) -> &str {
58        "RemoteTableExec"
59    }
60
61    fn as_any(&self) -> &dyn Any {
62        self
63    }
64
65    fn properties(&self) -> &PlanProperties {
66        &self.plan_properties
67    }
68
69    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
70        vec![]
71    }
72
73    fn with_new_children(
74        self: Arc<Self>,
75        _children: Vec<Arc<dyn ExecutionPlan>>,
76    ) -> DFResult<Arc<dyn ExecutionPlan>> {
77        Ok(self)
78    }
79
80    fn execute(
81        &self,
82        partition: usize,
83        _context: Arc<TaskContext>,
84    ) -> DFResult<SendableRecordBatchStream> {
85        assert_eq!(partition, 0);
86        let schema = self.schema();
87        let fut = build_and_transform_stream(
88            self.conn.clone(),
89            self.sql.clone(),
90            self.table_schema.clone(),
91            self.remote_schema.clone(),
92            self.projection.clone(),
93            self.transform.clone(),
94        );
95        let stream = futures::stream::once(fut).try_flatten();
96        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
97    }
98}
99
100async fn build_and_transform_stream(
101    conn: Arc<dyn Connection>,
102    sql: String,
103    table_schema: SchemaRef,
104    remote_schema: Option<RemoteSchemaRef>,
105    projection: Option<Vec<usize>>,
106    transform: Option<Arc<dyn Transform>>,
107) -> DFResult<SendableRecordBatchStream> {
108    let stream = conn
109        .query(sql, table_schema.clone(), projection.clone())
110        .await?;
111    if let Some(transform) = transform.as_ref() {
112        Ok(Box::pin(TransformStream::try_new(
113            stream,
114            transform.clone(),
115            table_schema,
116            projection,
117            remote_schema,
118        )?))
119    } else {
120        Ok(stream)
121    }
122}
123
124impl DisplayAs for RemoteTableExec {
125    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
126        write!(f, "RemoteTableExec")
127    }
128}