datafusion_remote_table/
exec.rs

1use crate::{
2    Connection, ConnectionOptions, DFResult, RemoteSchemaRef, Transform, TransformStream,
3    transform_schema,
4};
5use datafusion::arrow::datatypes::SchemaRef;
6use datafusion::execution::{SendableRecordBatchStream, TaskContext};
7use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
8use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
9use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
10use datafusion::physical_plan::{
11    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, project_schema,
12};
13use futures::TryStreamExt;
14use std::any::Any;
15use std::sync::Arc;
16
17#[derive(Debug)]
18pub struct RemoteTableExec {
19    pub(crate) conn_options: ConnectionOptions,
20    pub(crate) sql: String,
21    pub(crate) table_schema: SchemaRef,
22    pub(crate) remote_schema: Option<RemoteSchemaRef>,
23    pub(crate) projection: Option<Vec<usize>>,
24    pub(crate) limit: Option<usize>,
25    pub(crate) transform: Option<Arc<dyn Transform>>,
26    conn: Arc<dyn Connection>,
27    plan_properties: PlanProperties,
28}
29
30impl RemoteTableExec {
31    #[allow(clippy::too_many_arguments)]
32    pub fn try_new(
33        conn_options: ConnectionOptions,
34        sql: String,
35        table_schema: SchemaRef,
36        remote_schema: Option<RemoteSchemaRef>,
37        projection: Option<Vec<usize>>,
38        limit: Option<usize>,
39        transform: Option<Arc<dyn Transform>>,
40        conn: Arc<dyn Connection>,
41    ) -> DFResult<Self> {
42        let transformed_table_schema = transform_schema(
43            table_schema.clone(),
44            transform.as_ref(),
45            remote_schema.as_ref(),
46        )?;
47        let projected_schema = project_schema(&transformed_table_schema, projection.as_ref())?;
48        let plan_properties = PlanProperties::new(
49            EquivalenceProperties::new(projected_schema),
50            Partitioning::UnknownPartitioning(1),
51            EmissionType::Incremental,
52            Boundedness::Bounded,
53        );
54        Ok(Self {
55            conn_options,
56            sql,
57            table_schema,
58            remote_schema,
59            projection,
60            limit,
61            transform,
62            conn,
63            plan_properties,
64        })
65    }
66}
67
68impl ExecutionPlan for RemoteTableExec {
69    fn name(&self) -> &str {
70        "RemoteTableExec"
71    }
72
73    fn as_any(&self) -> &dyn Any {
74        self
75    }
76
77    fn properties(&self) -> &PlanProperties {
78        &self.plan_properties
79    }
80
81    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
82        vec![]
83    }
84
85    fn with_new_children(
86        self: Arc<Self>,
87        _children: Vec<Arc<dyn ExecutionPlan>>,
88    ) -> DFResult<Arc<dyn ExecutionPlan>> {
89        Ok(self)
90    }
91
92    fn execute(
93        &self,
94        partition: usize,
95        _context: Arc<TaskContext>,
96    ) -> DFResult<SendableRecordBatchStream> {
97        assert_eq!(partition, 0);
98        let schema = self.schema();
99        let fut = build_and_transform_stream(
100            self.conn.clone(),
101            self.conn_options.clone(),
102            self.sql.clone(),
103            self.table_schema.clone(),
104            self.remote_schema.clone(),
105            self.projection.clone(),
106            self.limit,
107            self.transform.clone(),
108        );
109        let stream = futures::stream::once(fut).try_flatten();
110        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
111    }
112
113    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
114        Some(Arc::new(Self {
115            conn_options: self.conn_options.clone(),
116            sql: self.sql.clone(),
117            table_schema: self.table_schema.clone(),
118            remote_schema: self.remote_schema.clone(),
119            projection: self.projection.clone(),
120            limit,
121            transform: self.transform.clone(),
122            conn: self.conn.clone(),
123            plan_properties: self.plan_properties.clone(),
124        }))
125    }
126
127    fn fetch(&self) -> Option<usize> {
128        self.limit
129    }
130}
131
132#[allow(clippy::too_many_arguments)]
133async fn build_and_transform_stream(
134    conn: Arc<dyn Connection>,
135    conn_options: ConnectionOptions,
136    sql: String,
137    table_schema: SchemaRef,
138    remote_schema: Option<RemoteSchemaRef>,
139    projection: Option<Vec<usize>>,
140    limit: Option<usize>,
141    transform: Option<Arc<dyn Transform>>,
142) -> DFResult<SendableRecordBatchStream> {
143    let stream = conn
144        .query(
145            &conn_options,
146            &sql,
147            table_schema.clone(),
148            projection.as_ref(),
149            limit,
150        )
151        .await?;
152    if let Some(transform) = transform.as_ref() {
153        Ok(Box::pin(TransformStream::try_new(
154            stream,
155            transform.clone(),
156            table_schema,
157            projection,
158            remote_schema,
159        )?))
160    } else {
161        Ok(stream)
162    }
163}
164
165impl DisplayAs for RemoteTableExec {
166    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
167        write!(f, "RemoteTableExec: limit={:?}", self.limit)
168    }
169}