datafusion_remote_table/
exec.rs

1use crate::{
2    Connection, ConnectionOptions, DFResult, RemoteSchemaRef, Transform, TransformStream,
3    transform_schema,
4};
5use datafusion::arrow::datatypes::SchemaRef;
6use datafusion::execution::{SendableRecordBatchStream, TaskContext};
7use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
8use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
9use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
10use datafusion::physical_plan::{
11    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, project_schema,
12};
13use futures::TryStreamExt;
14use std::any::Any;
15use std::sync::Arc;
16
17#[derive(Debug)]
18pub struct RemoteTableExec {
19    pub(crate) conn_options: ConnectionOptions,
20    pub(crate) sql: String,
21    pub(crate) table_schema: SchemaRef,
22    pub(crate) remote_schema: Option<RemoteSchemaRef>,
23    pub(crate) projection: Option<Vec<usize>>,
24    pub(crate) unparsed_filters: Vec<String>,
25    pub(crate) limit: Option<usize>,
26    pub(crate) transform: Arc<dyn Transform>,
27    conn: Arc<dyn Connection>,
28    plan_properties: PlanProperties,
29}
30
31impl RemoteTableExec {
32    #[allow(clippy::too_many_arguments)]
33    pub fn try_new(
34        conn_options: ConnectionOptions,
35        sql: String,
36        table_schema: SchemaRef,
37        remote_schema: Option<RemoteSchemaRef>,
38        projection: Option<Vec<usize>>,
39        unparsed_filters: Vec<String>,
40        limit: Option<usize>,
41        transform: Arc<dyn Transform>,
42        conn: Arc<dyn Connection>,
43    ) -> DFResult<Self> {
44        let transformed_table_schema = transform_schema(
45            table_schema.clone(),
46            transform.as_ref(),
47            remote_schema.as_ref(),
48        )?;
49        let projected_schema = project_schema(&transformed_table_schema, projection.as_ref())?;
50        let plan_properties = PlanProperties::new(
51            EquivalenceProperties::new(projected_schema),
52            Partitioning::UnknownPartitioning(1),
53            EmissionType::Incremental,
54            Boundedness::Bounded,
55        );
56        Ok(Self {
57            conn_options,
58            sql,
59            table_schema,
60            remote_schema,
61            projection,
62            unparsed_filters,
63            limit,
64            transform,
65            conn,
66            plan_properties,
67        })
68    }
69}
70
71impl ExecutionPlan for RemoteTableExec {
72    fn name(&self) -> &str {
73        "RemoteTableExec"
74    }
75
76    fn as_any(&self) -> &dyn Any {
77        self
78    }
79
80    fn properties(&self) -> &PlanProperties {
81        &self.plan_properties
82    }
83
84    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
85        vec![]
86    }
87
88    fn with_new_children(
89        self: Arc<Self>,
90        _children: Vec<Arc<dyn ExecutionPlan>>,
91    ) -> DFResult<Arc<dyn ExecutionPlan>> {
92        Ok(self)
93    }
94
95    fn execute(
96        &self,
97        partition: usize,
98        _context: Arc<TaskContext>,
99    ) -> DFResult<SendableRecordBatchStream> {
100        assert_eq!(partition, 0);
101        let schema = self.schema();
102        let fut = build_and_transform_stream(
103            self.conn.clone(),
104            self.conn_options.clone(),
105            self.sql.clone(),
106            self.table_schema.clone(),
107            self.remote_schema.clone(),
108            self.projection.clone(),
109            self.unparsed_filters.clone(),
110            self.limit,
111            self.transform.clone(),
112        );
113        let stream = futures::stream::once(fut).try_flatten();
114        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
115    }
116
117    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
118        if self
119            .conn_options
120            .db_type()
121            .support_rewrite_with_filters_limit(&self.sql)
122        {
123            Some(Arc::new(Self {
124                conn_options: self.conn_options.clone(),
125                sql: self.sql.clone(),
126                table_schema: self.table_schema.clone(),
127                remote_schema: self.remote_schema.clone(),
128                projection: self.projection.clone(),
129                unparsed_filters: self.unparsed_filters.clone(),
130                limit,
131                transform: self.transform.clone(),
132                conn: self.conn.clone(),
133                plan_properties: self.plan_properties.clone(),
134            }))
135        } else {
136            None
137        }
138    }
139
140    fn fetch(&self) -> Option<usize> {
141        self.limit
142    }
143}
144
145#[allow(clippy::too_many_arguments)]
146async fn build_and_transform_stream(
147    conn: Arc<dyn Connection>,
148    conn_options: ConnectionOptions,
149    sql: String,
150    table_schema: SchemaRef,
151    remote_schema: Option<RemoteSchemaRef>,
152    projection: Option<Vec<usize>>,
153    unparsed_filters: Vec<String>,
154    limit: Option<usize>,
155    transform: Arc<dyn Transform>,
156) -> DFResult<SendableRecordBatchStream> {
157    let limit = if conn_options
158        .db_type()
159        .support_rewrite_with_filters_limit(&sql)
160    {
161        limit
162    } else {
163        None
164    };
165
166    let stream = conn
167        .query(
168            &conn_options,
169            &sql,
170            table_schema.clone(),
171            projection.as_ref(),
172            unparsed_filters.as_slice(),
173            limit,
174        )
175        .await?;
176
177    Ok(Box::pin(TransformStream::try_new(
178        stream,
179        transform.clone(),
180        table_schema,
181        projection,
182        remote_schema,
183    )?))
184}
185
186impl DisplayAs for RemoteTableExec {
187    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
188        write!(
189            f,
190            "RemoteTableExec: limit={:?}, filters=[{}]",
191            self.limit,
192            self.unparsed_filters.join(", ")
193        )
194    }
195}