1use crate::{
2 Connection, ConnectionOptions, DFResult, RemoteSchemaRef, Transform, TransformStream,
3 transform_schema,
4};
5use datafusion::arrow::datatypes::SchemaRef;
6use datafusion::common::Column;
7use datafusion::common::tree_node::{Transformed, TreeNode};
8use datafusion::execution::{SendableRecordBatchStream, TaskContext};
9use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
10use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
11use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
12use datafusion::physical_plan::{
13 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, project_schema,
14};
15use datafusion::prelude::Expr;
16use futures::TryStreamExt;
17use std::any::Any;
18use std::sync::Arc;
19
20#[derive(Debug)]
21pub struct RemoteTableExec {
22 pub(crate) conn_options: ConnectionOptions,
23 pub(crate) sql: String,
24 pub(crate) table_schema: SchemaRef,
25 pub(crate) remote_schema: Option<RemoteSchemaRef>,
26 pub(crate) projection: Option<Vec<usize>>,
27 pub(crate) filters: Vec<Expr>,
28 pub(crate) limit: Option<usize>,
29 pub(crate) transform: Option<Arc<dyn Transform>>,
30 conn: Arc<dyn Connection>,
31 plan_properties: PlanProperties,
32}
33
34impl RemoteTableExec {
35 #[allow(clippy::too_many_arguments)]
36 pub fn try_new(
37 conn_options: ConnectionOptions,
38 sql: String,
39 table_schema: SchemaRef,
40 remote_schema: Option<RemoteSchemaRef>,
41 projection: Option<Vec<usize>>,
42 filters: Vec<Expr>,
43 limit: Option<usize>,
44 transform: Option<Arc<dyn Transform>>,
45 conn: Arc<dyn Connection>,
46 ) -> DFResult<Self> {
47 let transformed_table_schema = transform_schema(
48 table_schema.clone(),
49 transform.as_ref(),
50 remote_schema.as_ref(),
51 )?;
52 let projected_schema = project_schema(&transformed_table_schema, projection.as_ref())?;
53 let plan_properties = PlanProperties::new(
54 EquivalenceProperties::new(projected_schema),
55 Partitioning::UnknownPartitioning(1),
56 EmissionType::Incremental,
57 Boundedness::Bounded,
58 );
59 Ok(Self {
60 conn_options,
61 sql,
62 table_schema,
63 remote_schema,
64 projection,
65 filters,
66 limit,
67 transform,
68 conn,
69 plan_properties,
70 })
71 }
72}
73
74impl ExecutionPlan for RemoteTableExec {
75 fn name(&self) -> &str {
76 "RemoteTableExec"
77 }
78
79 fn as_any(&self) -> &dyn Any {
80 self
81 }
82
83 fn properties(&self) -> &PlanProperties {
84 &self.plan_properties
85 }
86
87 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
88 vec![]
89 }
90
91 fn with_new_children(
92 self: Arc<Self>,
93 _children: Vec<Arc<dyn ExecutionPlan>>,
94 ) -> DFResult<Arc<dyn ExecutionPlan>> {
95 Ok(self)
96 }
97
98 fn execute(
99 &self,
100 partition: usize,
101 _context: Arc<TaskContext>,
102 ) -> DFResult<SendableRecordBatchStream> {
103 assert_eq!(partition, 0);
104 let schema = self.schema();
105 let fut = build_and_transform_stream(
106 self.conn.clone(),
107 self.conn_options.clone(),
108 self.sql.clone(),
109 self.table_schema.clone(),
110 self.remote_schema.clone(),
111 self.projection.clone(),
112 self.filters.clone(),
113 self.limit,
114 self.transform.clone(),
115 );
116 let stream = futures::stream::once(fut).try_flatten();
117 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
118 }
119
120 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
121 if self
122 .conn_options
123 .db_type()
124 .support_rewrite_with_filters_limit(&self.sql)
125 {
126 Some(Arc::new(Self {
127 conn_options: self.conn_options.clone(),
128 sql: self.sql.clone(),
129 table_schema: self.table_schema.clone(),
130 remote_schema: self.remote_schema.clone(),
131 projection: self.projection.clone(),
132 filters: self.filters.clone(),
133 limit,
134 transform: self.transform.clone(),
135 conn: self.conn.clone(),
136 plan_properties: self.plan_properties.clone(),
137 }))
138 } else {
139 None
140 }
141 }
142
143 fn fetch(&self) -> Option<usize> {
144 self.limit
145 }
146}
147
148#[allow(clippy::too_many_arguments)]
149async fn build_and_transform_stream(
150 conn: Arc<dyn Connection>,
151 conn_options: ConnectionOptions,
152 sql: String,
153 table_schema: SchemaRef,
154 remote_schema: Option<RemoteSchemaRef>,
155 projection: Option<Vec<usize>>,
156 filters: Vec<Expr>,
157 limit: Option<usize>,
158 transform: Option<Arc<dyn Transform>>,
159) -> DFResult<SendableRecordBatchStream> {
160 let transformed_table_schema = transform_schema(
161 table_schema.clone(),
162 transform.as_ref(),
163 remote_schema.as_ref(),
164 )?;
165
166 let rewritten_filters =
167 rewrite_filters_column(filters, &table_schema, &transformed_table_schema)?;
168
169 let limit = if conn_options
170 .db_type()
171 .support_rewrite_with_filters_limit(&sql)
172 {
173 limit
174 } else {
175 None
176 };
177
178 let stream = conn
179 .query(
180 &conn_options,
181 &sql,
182 table_schema.clone(),
183 projection.as_ref(),
184 rewritten_filters.as_slice(),
185 limit,
186 )
187 .await?;
188
189 if let Some(transform) = transform.as_ref() {
190 Ok(Box::pin(TransformStream::try_new(
191 stream,
192 transform.clone(),
193 table_schema,
194 projection,
195 remote_schema,
196 )?))
197 } else {
198 Ok(stream)
199 }
200}
201
202fn rewrite_filters_column(
203 filters: Vec<Expr>,
204 table_schema: &SchemaRef,
205 transformed_table_schema: &SchemaRef,
206) -> DFResult<Vec<Expr>> {
207 filters
208 .into_iter()
209 .map(|f| {
210 f.transform_down(|e| {
211 if let Expr::Column(col) = e {
212 let col_idx = transformed_table_schema.index_of(col.name())?;
213 let row_name = table_schema.field(col_idx).name().to_string();
214 Ok(Transformed::yes(Expr::Column(Column::new_unqualified(
215 row_name,
216 ))))
217 } else {
218 Ok(Transformed::no(e))
219 }
220 })
221 .map(|trans| trans.data)
222 })
223 .collect::<DFResult<Vec<_>>>()
224}
225
226impl DisplayAs for RemoteTableExec {
227 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
228 write!(
229 f,
230 "RemoteTableExec: limit={:?}, filters=[{}]",
231 self.limit,
232 self.filters
233 .iter()
234 .map(|e| format!("{e}"))
235 .collect::<Vec<_>>()
236 .join(", ")
237 )
238 }
239}