1use crate::{
2 Connection, ConnectionOptions, DFResult, DefaultTransform, RemoteSchemaRef, RemoteSource,
3 Transform, TransformStream, transform_schema,
4};
5use datafusion::arrow::datatypes::SchemaRef;
6use datafusion::common::Statistics;
7use datafusion::common::stats::Precision;
8use datafusion::error::DataFusionError;
9use datafusion::execution::{SendableRecordBatchStream, TaskContext};
10use datafusion::physical_expr::{EquivalenceProperties, Partitioning};
11use datafusion::physical_plan::display::ProjectSchemaDisplay;
12use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
13use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
14use datafusion::physical_plan::{
15 DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties, project_schema,
16};
17use futures::TryStreamExt;
18use log::{debug, warn};
19use std::any::Any;
20use std::sync::Arc;
21
22#[derive(Debug)]
23pub struct RemoteTableScanExec {
24 pub(crate) conn_options: Arc<ConnectionOptions>,
25 pub(crate) source: RemoteSource,
26 pub(crate) table_schema: SchemaRef,
27 pub(crate) remote_schema: Option<RemoteSchemaRef>,
28 pub(crate) projection: Option<Vec<usize>>,
29 pub(crate) unparsed_filters: Vec<String>,
30 pub(crate) limit: Option<usize>,
31 pub(crate) transform: Arc<dyn Transform>,
32 pub(crate) conn: Arc<dyn Connection>,
33 plan_properties: PlanProperties,
34}
35
36impl RemoteTableScanExec {
37 #[allow(clippy::too_many_arguments)]
38 pub fn try_new(
39 conn_options: Arc<ConnectionOptions>,
40 source: RemoteSource,
41 table_schema: SchemaRef,
42 remote_schema: Option<RemoteSchemaRef>,
43 projection: Option<Vec<usize>>,
44 unparsed_filters: Vec<String>,
45 limit: Option<usize>,
46 transform: Arc<dyn Transform>,
47 conn: Arc<dyn Connection>,
48 ) -> DFResult<Self> {
49 let transformed_table_schema = transform_schema(
50 transform.as_ref(),
51 table_schema.clone(),
52 remote_schema.as_ref(),
53 conn_options.db_type(),
54 )?;
55 let projected_schema = project_schema(&transformed_table_schema, projection.as_ref())?;
56 let plan_properties = PlanProperties::new(
57 EquivalenceProperties::new(projected_schema),
58 Partitioning::UnknownPartitioning(1),
59 EmissionType::Incremental,
60 Boundedness::Bounded,
61 );
62 Ok(Self {
63 conn_options,
64 source,
65 table_schema,
66 remote_schema,
67 projection,
68 unparsed_filters,
69 limit,
70 transform,
71 conn,
72 plan_properties,
73 })
74 }
75}
76
77impl ExecutionPlan for RemoteTableScanExec {
78 fn name(&self) -> &str {
79 "RemoteTableExec"
80 }
81
82 fn as_any(&self) -> &dyn Any {
83 self
84 }
85
86 fn properties(&self) -> &PlanProperties {
87 &self.plan_properties
88 }
89
90 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
91 vec![]
92 }
93
94 fn with_new_children(
95 self: Arc<Self>,
96 _children: Vec<Arc<dyn ExecutionPlan>>,
97 ) -> DFResult<Arc<dyn ExecutionPlan>> {
98 Ok(self)
99 }
100
101 fn execute(
102 &self,
103 partition: usize,
104 _context: Arc<TaskContext>,
105 ) -> DFResult<SendableRecordBatchStream> {
106 assert_eq!(partition, 0);
107 let schema = self.schema();
108 let fut = build_and_transform_stream(
109 self.conn.clone(),
110 self.conn_options.clone(),
111 self.source.clone(),
112 self.table_schema.clone(),
113 self.remote_schema.clone(),
114 self.projection.clone(),
115 self.unparsed_filters.clone(),
116 self.limit,
117 self.transform.clone(),
118 );
119 let stream = futures::stream::once(fut).try_flatten();
120 Ok(Box::pin(RecordBatchStreamAdapter::new(schema, stream)))
121 }
122
123 fn partition_statistics(&self, partition: Option<usize>) -> DFResult<Statistics> {
124 if let Some(partition) = partition
125 && partition != 0
126 {
127 return Err(DataFusionError::Plan(format!(
128 "Invalid partition index: {partition}"
129 )));
130 }
131 let db_type = self.conn_options.db_type();
132 let limit = if db_type.support_rewrite_with_filters_limit(&self.source) {
133 self.limit
134 } else {
135 None
136 };
137 let real_sql = db_type.rewrite_query(&self.source, &self.unparsed_filters, limit);
138
139 if let Some(count1_query) = db_type.try_count1_query(&RemoteSource::Query(real_sql)) {
140 let conn = self.conn.clone();
141 let conn_options = self.conn_options.clone();
142 let row_count_result = tokio::task::block_in_place(|| {
143 tokio::runtime::Handle::current().block_on(async {
144 db_type
145 .fetch_count(conn, &conn_options, &count1_query)
146 .await
147 })
148 });
149
150 match row_count_result {
151 Ok(row_count) => {
152 let column_stat = Statistics::unknown_column(self.schema().as_ref());
153 Ok(Statistics {
154 num_rows: Precision::Exact(row_count),
155 total_byte_size: Precision::Absent,
156 column_statistics: column_stat,
157 })
158 }
159 Err(e) => {
160 warn!("[remote-table] Failed to fetch exec statistics: {e}");
161 Err(e)
162 }
163 }
164 } else {
165 debug!(
166 "[remote-table] Query can not be rewritten as count1 query: {}",
167 self.source
168 );
169 Ok(Statistics::new_unknown(self.schema().as_ref()))
170 }
171 }
172
173 fn with_fetch(&self, limit: Option<usize>) -> Option<Arc<dyn ExecutionPlan>> {
174 let db_type = self.conn_options.db_type();
175 if db_type.support_rewrite_with_filters_limit(&self.source) {
176 Some(Arc::new(Self {
177 conn_options: self.conn_options.clone(),
178 source: self.source.clone(),
179 table_schema: self.table_schema.clone(),
180 remote_schema: self.remote_schema.clone(),
181 projection: self.projection.clone(),
182 unparsed_filters: self.unparsed_filters.clone(),
183 limit,
184 transform: self.transform.clone(),
185 conn: self.conn.clone(),
186 plan_properties: self.plan_properties.clone(),
187 }))
188 } else {
189 None
190 }
191 }
192
193 fn fetch(&self) -> Option<usize> {
194 self.limit
195 }
196}
197
198#[allow(clippy::too_many_arguments)]
199async fn build_and_transform_stream(
200 conn: Arc<dyn Connection>,
201 conn_options: Arc<ConnectionOptions>,
202 source: RemoteSource,
203 table_schema: SchemaRef,
204 remote_schema: Option<RemoteSchemaRef>,
205 projection: Option<Vec<usize>>,
206 unparsed_filters: Vec<String>,
207 limit: Option<usize>,
208 transform: Arc<dyn Transform>,
209) -> DFResult<SendableRecordBatchStream> {
210 let db_type = conn_options.db_type();
211 let limit = if db_type.support_rewrite_with_filters_limit(&source) {
212 limit
213 } else {
214 None
215 };
216
217 if transform.as_any().is::<DefaultTransform>() {
218 conn.query(
219 &conn_options,
220 &source,
221 table_schema.clone(),
222 projection.as_ref(),
223 unparsed_filters.as_slice(),
224 limit,
225 )
226 .await
227 } else {
228 let Some(remote_schema) = remote_schema else {
229 return Err(DataFusionError::Execution(
230 "remote_schema is required for non-default transform".to_string(),
231 ));
232 };
233 let stream = conn
234 .query(
235 &conn_options,
236 &source,
237 table_schema.clone(),
238 None,
239 unparsed_filters.as_slice(),
240 limit,
241 )
242 .await?;
243 Ok(Box::pin(TransformStream::try_new(
244 stream,
245 transform.clone(),
246 table_schema,
247 projection,
248 remote_schema,
249 db_type,
250 )?))
251 }
252}
253
254impl DisplayAs for RemoteTableScanExec {
255 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
256 write!(
257 f,
258 "RemoteTableExec: source={}",
259 match &self.source {
260 RemoteSource::Query(_query) => "query".to_string(),
261 RemoteSource::Table(table) => table.join("."),
262 }
263 )?;
264 let projected_schema = self.schema();
265 if has_projection(self.projection.as_ref(), self.table_schema.fields().len()) {
266 write!(
267 f,
268 ", projection={}",
269 ProjectSchemaDisplay(&projected_schema)
270 )?;
271 }
272 if let Some(limit) = self.limit {
273 write!(f, ", limit={limit}")?;
274 }
275 if !self.unparsed_filters.is_empty() {
276 write!(f, ", filters=[{}]", self.unparsed_filters.join(", "))?;
277 }
278 Ok(())
279 }
280}
281
282fn has_projection(projection: Option<&Vec<usize>>, table_columns: usize) -> bool {
283 if let Some(projection) = projection {
284 if projection.len() != table_columns {
285 return true;
286 }
287 for (i, index) in projection.iter().enumerate() {
288 if *index != i {
289 return true;
290 }
291 }
292 }
293 false
294}