datafusion_remote_table/
exec.rs

1use crate::{
2    Connection, ConnectionOptions, DFResult, RemoteSchemaRef, Transform, TransformStream,
3    transform_schema,
4};
5use datafusion::arrow::datatypes::SchemaRef;
6use datafusion::common::Column;
7use datafusion::common::tree_node::{Transformed, TreeNode};
8use datafusion::execution::{SendableRecordBatchStream, TaskContext};
9use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
10use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
11use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
12use datafusion::physical_plan::{
13    DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, project_schema,
14};
15use datafusion::prelude::Expr;
16use futures::TryStreamExt;
17use std::any::Any;
18use std::sync::Arc;
19
20#[derive(Debug)]
21pub struct RemoteTableExec {
22    pub(crate) conn_options: ConnectionOptions,
23    pub(crate) sql: String,
24    pub(crate) table_schema: SchemaRef,
25    pub(crate) remote_schema: Option<RemoteSchemaRef>,
26    pub(crate) projection: Option<Vec<usize>>,
27    pub(crate) filters: Vec<Expr>,
28    pub(crate) limit: Option<usize>,
29    pub(crate) transform: Option<Arc<dyn Transform>>,
30    conn: Arc<dyn Connection>,
31    plan_properties: PlanProperties,
32}
33
34impl RemoteTableExec {
35    #[allow(clippy::too_many_arguments)]
36    pub fn try_new(
37        conn_options: ConnectionOptions,
38        sql: String,
39        table_schema: SchemaRef,
40        remote_schema: Option<RemoteSchemaRef>,
41        projection: Option<Vec<usize>>,
42        filters: Vec<Expr>,
43        limit: Option<usize>,
44        transform: Option<Arc<dyn Transform>>,
45        conn: Arc<dyn Connection>,
46    ) -> DFResult<Self> {
47        let transformed_table_schema = transform_schema(
48            table_schema.clone(),
49            transform.as_ref(),
50            remote_schema.as_ref(),
51        )?;
52        let projected_schema = project_schema(&transformed_table_schema, projection.as_ref())?;
53        let plan_properties = PlanProperties::new(
54            EquivalenceProperties::new(projected_schema),
55            Partitioning::UnknownPartitioning(1),
56            EmissionType::Incremental,
57            Boundedness::Bounded,
58        );
59        Ok(Self {
60            conn_options,
61            sql,
62            table_schema,
63            remote_schema,
64            projection,
65            filters,
66            limit,
67            transform,
68            conn,
69            plan_properties,
70        })
71    }
72}
73
74impl ExecutionPlan for RemoteTableExec {
75    fn name(&self) -> &str {
76        "RemoteTableExec"
77    }
78
79    fn as_any(&self) -> &dyn Any {
80        self
81    }
82
83    fn properties(&self) -> &PlanProperties {
84        &self.plan_properties
85    }
86
87    fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
88        vec![]
89    }
90
91    fn with_new_children(
92        self: Arc<Self>,
93        _children: Vec<Arc<dyn ExecutionPlan>>,
94    ) -> DFResult<Arc<dyn ExecutionPlan>> {
95        Ok(self)
96    }
97
98    fn execute(
99        &self,
100        partition: usize,
101        _context: Arc<TaskContext>,
102    ) -> DFResult<SendableRecordBatchStream> {
103        assert_eq!(partition, 0);
104        let schema = self.schema();
105        let fut = build_and_transform_stream(
106            self.conn.clone(),
107            self.conn_options.clone(),
108            self.sql.clone(),
109            self.table_schema.clone(),
110            self.remote_schema.clone(),
111            self.projection.clone(),
112            self.filters.clone(),
113            self.limit,
114            self.transform.clone(),
115        );
116        let stream = futures::stream::once(fut).try_flatten();
117        Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
118    }
119
120    fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
121        if self
122            .conn_options
123            .db_type()
124            .support_rewrite_with_filters_limit(&self.sql)
125        {
126            Some(Arc::new(Self {
127                conn_options: self.conn_options.clone(),
128                sql: self.sql.clone(),
129                table_schema: self.table_schema.clone(),
130                remote_schema: self.remote_schema.clone(),
131                projection: self.projection.clone(),
132                filters: self.filters.clone(),
133                limit,
134                transform: self.transform.clone(),
135                conn: self.conn.clone(),
136                plan_properties: self.plan_properties.clone(),
137            }))
138        } else {
139            None
140        }
141    }
142
143    fn fetch(&self) -> Option<usize> {
144        self.limit
145    }
146}
147
148#[allow(clippy::too_many_arguments)]
149async fn build_and_transform_stream(
150    conn: Arc<dyn Connection>,
151    conn_options: ConnectionOptions,
152    sql: String,
153    table_schema: SchemaRef,
154    remote_schema: Option<RemoteSchemaRef>,
155    projection: Option<Vec<usize>>,
156    filters: Vec<Expr>,
157    limit: Option<usize>,
158    transform: Option<Arc<dyn Transform>>,
159) -> DFResult<SendableRecordBatchStream> {
160    let transformed_table_schema = transform_schema(
161        table_schema.clone(),
162        transform.as_ref(),
163        remote_schema.as_ref(),
164    )?;
165
166    let rewritten_filters =
167        rewrite_filters_column(filters, &table_schema, &transformed_table_schema)?;
168
169    let limit = if conn_options
170        .db_type()
171        .support_rewrite_with_filters_limit(&sql)
172    {
173        limit
174    } else {
175        None
176    };
177
178    let stream = conn
179        .query(
180            &conn_options,
181            &sql,
182            table_schema.clone(),
183            projection.as_ref(),
184            rewritten_filters.as_slice(),
185            limit,
186        )
187        .await?;
188
189    if let Some(transform) = transform.as_ref() {
190        Ok(Box::pin(TransformStream::try_new(
191            stream,
192            transform.clone(),
193            table_schema,
194            projection,
195            remote_schema,
196        )?))
197    } else {
198        Ok(stream)
199    }
200}
201
202fn rewrite_filters_column(
203    filters: Vec<Expr>,
204    table_schema: &SchemaRef,
205    transformed_table_schema: &SchemaRef,
206) -> DFResult<Vec<Expr>> {
207    filters
208        .into_iter()
209        .map(|f| {
210            f.transform_down(|e| {
211                if let Expr::Column(col) = e {
212                    let col_idx = transformed_table_schema.index_of(col.name())?;
213                    let row_name = table_schema.field(col_idx).name().to_string();
214                    Ok(Transformed::yes(Expr::Column(Column::new_unqualified(
215                        row_name,
216                    ))))
217                } else {
218                    Ok(Transformed::no(e))
219                }
220            })
221            .map(|trans| trans.data)
222        })
223        .collect::<DFResult<Vec<_>>>()
224}
225
226impl DisplayAs for RemoteTableExec {
227    fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
228        write!(
229            f,
230            "RemoteTableExec: limit={:?}, filters=[{}]",
231            self.limit,
232            self.filters
233                .iter()
234                .map(|e| format!("{e}"))
235                .collect::<Vec<_>>()
236                .join(", ")
237        )
238    }
239}