1use crate::{
2 ConnectionOptions, DFResult, DefaultLiteralizer, DefaultTransform, Literalize, Pool,
3 RemoteDbType, RemoteSchema, RemoteSchemaRef, RemoteTableInsertExec, RemoteTableScanExec,
4 Transform, TransformArgs, connect, transform_schema,
5};
6use datafusion::arrow::datatypes::SchemaRef;
7use datafusion::catalog::{Session, TableProvider};
8use datafusion::common::Statistics;
9use datafusion::common::stats::Precision;
10use datafusion::datasource::TableType;
11use datafusion::error::DataFusionError;
12use datafusion::logical_expr::dml::InsertOp;
13use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
14use datafusion::physical_plan::ExecutionPlan;
15use log::{debug, warn};
16use std::any::Any;
17use std::sync::Arc;
18
19#[derive(Debug, Clone)]
20pub enum RemoteSource {
21 Query(String),
22 Table(Vec<String>),
23}
24
25impl RemoteSource {
26 pub fn query(&self, db_type: RemoteDbType) -> String {
27 match self {
28 RemoteSource::Query(query) => query.clone(),
29 RemoteSource::Table(table_identifiers) => db_type.select_all_query(table_identifiers),
30 }
31 }
32}
33
34impl std::fmt::Display for RemoteSource {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 match self {
37 RemoteSource::Query(query) => write!(f, "{query}"),
38 RemoteSource::Table(table) => write!(f, "{}", table.join(".")),
39 }
40 }
41}
42
43impl From<String> for RemoteSource {
44 fn from(query: String) -> Self {
45 RemoteSource::Query(query)
46 }
47}
48
49impl From<&String> for RemoteSource {
50 fn from(query: &String) -> Self {
51 RemoteSource::Query(query.clone())
52 }
53}
54
55impl From<&str> for RemoteSource {
56 fn from(query: &str) -> Self {
57 RemoteSource::Query(query.to_string())
58 }
59}
60
61impl From<Vec<String>> for RemoteSource {
62 fn from(table_identifiers: Vec<String>) -> Self {
63 RemoteSource::Table(table_identifiers)
64 }
65}
66
67impl From<Vec<&str>> for RemoteSource {
68 fn from(table_identifiers: Vec<&str>) -> Self {
69 RemoteSource::Table(
70 table_identifiers
71 .into_iter()
72 .map(|s| s.to_string())
73 .collect(),
74 )
75 }
76}
77
78impl From<Vec<&String>> for RemoteSource {
79 fn from(table_identifiers: Vec<&String>) -> Self {
80 RemoteSource::Table(table_identifiers.into_iter().cloned().collect())
81 }
82}
83
84#[derive(Debug)]
85pub struct RemoteTable {
86 pub(crate) conn_options: Arc<ConnectionOptions>,
87 pub(crate) source: RemoteSource,
88 pub(crate) table_schema: SchemaRef,
89 pub(crate) transformed_table_schema: SchemaRef,
90 pub(crate) remote_schema: Option<RemoteSchemaRef>,
91 pub(crate) transform: Arc<dyn Transform>,
92 pub(crate) literalizer: Arc<dyn Literalize>,
93 pub(crate) pool: Arc<dyn Pool>,
94}
95
96impl RemoteTable {
97 pub async fn try_new(
98 conn_options: impl Into<ConnectionOptions>,
99 source: impl Into<RemoteSource>,
100 ) -> DFResult<Self> {
101 Self::try_new_with_schema_transform_literalizer(
102 conn_options,
103 source,
104 None,
105 None,
106 Arc::new(DefaultTransform {}),
107 Arc::new(DefaultLiteralizer {}),
108 )
109 .await
110 }
111
112 pub async fn try_new_with_schema(
113 conn_options: impl Into<ConnectionOptions>,
114 source: impl Into<RemoteSource>,
115 table_schema: SchemaRef,
116 ) -> DFResult<Self> {
117 Self::try_new_with_schema_transform_literalizer(
118 conn_options,
119 source,
120 Some(table_schema),
121 None,
122 Arc::new(DefaultTransform {}),
123 Arc::new(DefaultLiteralizer {}),
124 )
125 .await
126 }
127
128 pub async fn try_new_with_remote_schema(
129 conn_options: impl Into<ConnectionOptions>,
130 source: impl Into<RemoteSource>,
131 remote_schema: RemoteSchemaRef,
132 ) -> DFResult<Self> {
133 Self::try_new_with_schema_transform_literalizer(
134 conn_options,
135 source,
136 None,
137 Some(remote_schema),
138 Arc::new(DefaultTransform {}),
139 Arc::new(DefaultLiteralizer {}),
140 )
141 .await
142 }
143
144 pub async fn try_new_with_transform(
145 conn_options: impl Into<ConnectionOptions>,
146 source: impl Into<RemoteSource>,
147 transform: Arc<dyn Transform>,
148 ) -> DFResult<Self> {
149 Self::try_new_with_schema_transform_literalizer(
150 conn_options,
151 source,
152 None,
153 None,
154 transform,
155 Arc::new(DefaultLiteralizer {}),
156 )
157 .await
158 }
159
160 pub async fn try_new_with_schema_transform(
161 conn_options: impl Into<ConnectionOptions>,
162 source: impl Into<RemoteSource>,
163 table_schema: SchemaRef,
164 transform: Arc<dyn Transform>,
165 ) -> DFResult<Self> {
166 Self::try_new_with_schema_transform_literalizer(
167 conn_options,
168 source,
169 Some(table_schema),
170 None,
171 transform,
172 Arc::new(DefaultLiteralizer {}),
173 )
174 .await
175 }
176
177 pub async fn try_new_with_remote_schema_transform(
178 conn_options: impl Into<ConnectionOptions>,
179 source: impl Into<RemoteSource>,
180 remote_schema: RemoteSchemaRef,
181 transform: Arc<dyn Transform>,
182 ) -> DFResult<Self> {
183 Self::try_new_with_schema_transform_literalizer(
184 conn_options,
185 source,
186 None,
187 Some(remote_schema),
188 transform,
189 Arc::new(DefaultLiteralizer {}),
190 )
191 .await
192 }
193
194 pub async fn try_new_with_schema_transform_literalizer(
195 conn_options: impl Into<ConnectionOptions>,
196 source: impl Into<RemoteSource>,
197 table_schema: Option<SchemaRef>,
198 remote_schema: Option<RemoteSchemaRef>,
199 transform: Arc<dyn Transform>,
200 literalizer: Arc<dyn Literalize>,
201 ) -> DFResult<Self> {
202 let conn_options = conn_options.into();
203 let source = source.into();
204
205 if let RemoteSource::Table(table) = &source
206 && table.is_empty()
207 {
208 return Err(DataFusionError::Plan(
209 "Table source is empty vec".to_string(),
210 ));
211 }
212
213 let now = std::time::Instant::now();
214 let pool = connect(&conn_options).await?;
215 debug!(
216 "[remote-table] Creating connection pool cost: {}ms",
217 now.elapsed().as_millis()
218 );
219
220 let infer_schema_fn =
221 async |pool: &Arc<dyn Pool>, source: &RemoteSource| -> DFResult<RemoteSchemaRef> {
222 let now = std::time::Instant::now();
223 let conn = pool.get().await?;
224 let remote_schema = conn.infer_schema(source).await?;
225 debug!(
226 "[remote-table] Inferring remote schema cost: {}ms",
227 now.elapsed().as_millis()
228 );
229 Ok(remote_schema)
230 };
231
232 let (table_schema, remote_schema_opt): (SchemaRef, Option<RemoteSchemaRef>) =
233 match (table_schema, remote_schema) {
234 (Some(table_schema), Some(remote_schema)) => (table_schema, Some(remote_schema)),
235 (Some(table_schema), None) => {
236 let remote_schema = if transform.as_any().is::<DefaultTransform>()
237 && matches!(source, RemoteSource::Query(_))
238 {
239 None
240 } else {
241 let remote_schema = infer_schema_fn(&pool, &source).await?;
243 Some(remote_schema)
244 };
245 (table_schema, remote_schema)
246 }
247 (None, Some(remote_schema)) => (
248 Arc::new(remote_schema.to_arrow_schema()),
249 Some(remote_schema),
250 ),
251 (None, None) => {
252 let remote_schema = infer_schema_fn(&pool, &source).await?;
254 let inferred_table_schema = Arc::new(remote_schema.to_arrow_schema());
255 (inferred_table_schema, Some(remote_schema))
256 }
257 };
258
259 if let Some(remote_schema) = &remote_schema_opt
260 && table_schema.fields.len() != remote_schema.fields.len()
261 {
262 return Err(DataFusionError::Plan(format!(
263 "fields length of table schema is not matched with remote schema. table schema: {table_schema}, remote schema: {remote_schema:?}"
264 )));
265 }
266
267 let transformed_table_schema = transform_schema(
268 transform.as_ref(),
269 table_schema.clone(),
270 remote_schema_opt.as_ref(),
271 conn_options.db_type(),
272 )?;
273
274 Ok(RemoteTable {
275 conn_options: Arc::new(conn_options),
276 source,
277 table_schema,
278 transformed_table_schema,
279 remote_schema: remote_schema_opt,
280 transform,
281 literalizer,
282 pool,
283 })
284 }
285
286 pub fn remote_schema(&self) -> Option<RemoteSchemaRef> {
287 self.remote_schema.clone()
288 }
289
290 pub fn pool(&self) -> &Arc<dyn Pool> {
291 &self.pool
292 }
293}
294
295#[async_trait::async_trait]
296impl TableProvider for RemoteTable {
297 fn as_any(&self) -> &dyn Any {
298 self
299 }
300
301 fn schema(&self) -> SchemaRef {
302 self.transformed_table_schema.clone()
303 }
304
305 fn table_type(&self) -> TableType {
306 TableType::Base
307 }
308
309 async fn scan(
310 &self,
311 _state: &dyn Session,
312 projection: Option<&Vec<usize>>,
313 filters: &[Expr],
314 limit: Option<usize>,
315 ) -> DFResult<Arc<dyn ExecutionPlan>> {
316 let remote_schema = if self.transform.as_any().is::<DefaultTransform>() {
317 Arc::new(RemoteSchema::empty())
318 } else {
319 let Some(remote_schema) = &self.remote_schema else {
320 return Err(DataFusionError::Plan(
321 "remote schema is none but transform is not DefaultTransform".to_string(),
322 ));
323 };
324 remote_schema.clone()
325 };
326 let mut unparsed_filters = vec![];
327 for filter in filters {
328 let args = TransformArgs {
329 db_type: self.conn_options.db_type(),
330 table_schema: &self.table_schema,
331 remote_schema: &remote_schema,
332 };
333 unparsed_filters.push(self.transform.unparse_filter(filter, args)?);
334 }
335
336 let now = std::time::Instant::now();
337 let conn = self.pool.get().await?;
338 debug!(
339 "[remote-table] Getting connection from pool cost: {}ms",
340 now.elapsed().as_millis()
341 );
342
343 Ok(Arc::new(RemoteTableScanExec::try_new(
344 self.conn_options.clone(),
345 self.source.clone(),
346 self.table_schema.clone(),
347 self.remote_schema.clone(),
348 projection.cloned(),
349 unparsed_filters,
350 limit,
351 self.transform.clone(),
352 conn,
353 )?))
354 }
355
356 fn supports_filters_pushdown(
357 &self,
358 filters: &[&Expr],
359 ) -> DFResult<Vec<TableProviderFilterPushDown>> {
360 let db_type = self.conn_options.db_type();
361 if !db_type.support_rewrite_with_filters_limit(&self.source) {
362 return Ok(vec![
363 TableProviderFilterPushDown::Unsupported;
364 filters.len()
365 ]);
366 }
367
368 let remote_schema = if self.transform.as_any().is::<DefaultTransform>() {
369 Arc::new(RemoteSchema::empty())
370 } else {
371 let Some(remote_schema) = &self.remote_schema else {
372 return Err(DataFusionError::Plan(
373 "remote schema is none but transform is not DefaultTransform".to_string(),
374 ));
375 };
376 remote_schema.clone()
377 };
378
379 let mut pushdown = vec![];
380 for filter in filters {
381 let args = TransformArgs {
382 db_type: self.conn_options.db_type(),
383 table_schema: &self.table_schema,
384 remote_schema: &remote_schema,
385 };
386 pushdown.push(self.transform.support_filter_pushdown(filter, args)?);
387 }
388 Ok(pushdown)
389 }
390
391 fn statistics(&self) -> Option<Statistics> {
392 let db_type = self.conn_options.db_type();
393 if let Some(count1_query) = db_type.try_count1_query(&self.source) {
394 let conn_options = self.conn_options.clone();
395 let row_count_result = tokio::task::block_in_place(|| {
396 tokio::runtime::Handle::current().block_on(async {
397 let pool = connect(&conn_options).await?;
398 let conn = pool.get().await?;
399 conn_options
400 .db_type()
401 .fetch_count(conn, &conn_options, &count1_query)
402 .await
403 })
404 });
405
406 match row_count_result {
407 Ok(row_count) => {
408 let column_stat =
409 Statistics::unknown_column(self.transformed_table_schema.as_ref());
410 Some(Statistics {
411 num_rows: Precision::Exact(row_count),
412 total_byte_size: Precision::Absent,
413 column_statistics: column_stat,
414 })
415 }
416 Err(e) => {
417 warn!("[remote-table] Failed to fetch table statistics: {e}");
418 None
419 }
420 }
421 } else {
422 debug!(
423 "[remote-table] Query can not be rewritten as count1 query: {}",
424 self.source
425 );
426 None
427 }
428 }
429
430 async fn insert_into(
431 &self,
432 _state: &dyn Session,
433 input: Arc<dyn ExecutionPlan>,
434 insert_op: InsertOp,
435 ) -> DFResult<Arc<dyn ExecutionPlan>> {
436 match insert_op {
437 InsertOp::Append => {}
438 InsertOp::Overwrite | InsertOp::Replace => {
439 return Err(DataFusionError::Execution(
440 "Only support append insert operation".to_string(),
441 ));
442 }
443 }
444
445 let remote_schema = self
446 .remote_schema
447 .as_ref()
448 .ok_or(DataFusionError::Execution(
449 "Remote schema is not available".to_string(),
450 ))?
451 .clone();
452
453 let RemoteSource::Table(table) = &self.source else {
454 return Err(DataFusionError::Execution(
455 "Only support insert operation for table".to_string(),
456 ));
457 };
458
459 let now = std::time::Instant::now();
460 let conn = self.pool.get().await?;
461 debug!(
462 "[remote-table] Getting connection from pool cost: {}ms",
463 now.elapsed().as_millis()
464 );
465
466 let exec = RemoteTableInsertExec::new(
467 input,
468 self.conn_options.clone(),
469 self.literalizer.clone(),
470 table.clone(),
471 remote_schema,
472 conn,
473 );
474 Ok(Arc::new(exec))
475 }
476}