1use crate::{
2 ConnectionOptions, DFResult, DefaultLiteralizer, DefaultTransform, Literalize, Pool,
3 RemoteDbType, RemoteSchema, RemoteSchemaRef, RemoteTableInsertExec, RemoteTableScanExec,
4 Transform, TransformArgs, connect, transform_schema,
5};
6use datafusion::arrow::datatypes::SchemaRef;
7use datafusion::catalog::{Session, TableProvider};
8use datafusion::common::Statistics;
9use datafusion::common::stats::Precision;
10use datafusion::datasource::TableType;
11use datafusion::error::DataFusionError;
12use datafusion::logical_expr::dml::InsertOp;
13use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
14use datafusion::physical_plan::ExecutionPlan;
15use log::{debug, warn};
16use std::any::Any;
17use std::sync::Arc;
18
19#[derive(Debug, Clone)]
20pub enum RemoteSource {
21 Query(String),
22 Table(Vec<String>),
23}
24
25impl RemoteSource {
26 pub fn query(&self, db_type: RemoteDbType) -> String {
27 match self {
28 RemoteSource::Query(query) => query.clone(),
29 RemoteSource::Table(table_identifiers) => db_type.select_all_query(table_identifiers),
30 }
31 }
32}
33
34impl std::fmt::Display for RemoteSource {
35 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36 match self {
37 RemoteSource::Query(query) => write!(f, "{query}"),
38 RemoteSource::Table(table) => write!(f, "{}", table.join(".")),
39 }
40 }
41}
42
43impl From<String> for RemoteSource {
44 fn from(query: String) -> Self {
45 RemoteSource::Query(query)
46 }
47}
48
49impl From<&String> for RemoteSource {
50 fn from(query: &String) -> Self {
51 RemoteSource::Query(query.clone())
52 }
53}
54
55impl From<&str> for RemoteSource {
56 fn from(query: &str) -> Self {
57 RemoteSource::Query(query.to_string())
58 }
59}
60
61impl From<Vec<String>> for RemoteSource {
62 fn from(table_identifiers: Vec<String>) -> Self {
63 RemoteSource::Table(table_identifiers)
64 }
65}
66
67impl From<Vec<&str>> for RemoteSource {
68 fn from(table_identifiers: Vec<&str>) -> Self {
69 RemoteSource::Table(
70 table_identifiers
71 .into_iter()
72 .map(|s| s.to_string())
73 .collect(),
74 )
75 }
76}
77
78impl From<Vec<&String>> for RemoteSource {
79 fn from(table_identifiers: Vec<&String>) -> Self {
80 RemoteSource::Table(table_identifiers.into_iter().cloned().collect())
81 }
82}
83
84#[derive(Debug)]
85pub struct RemoteTable {
86 pub(crate) conn_options: Arc<ConnectionOptions>,
87 pub(crate) source: RemoteSource,
88 pub(crate) table_schema: SchemaRef,
89 pub(crate) transformed_table_schema: SchemaRef,
90 pub(crate) remote_schema: Option<RemoteSchemaRef>,
91 pub(crate) transform: Arc<dyn Transform>,
92 pub(crate) literalizer: Arc<dyn Literalize>,
93 pub(crate) pool: Arc<dyn Pool>,
94}
95
96impl RemoteTable {
97 pub async fn try_new(
98 conn_options: impl Into<ConnectionOptions>,
99 source: impl Into<RemoteSource>,
100 ) -> DFResult<Self> {
101 Self::try_new_with_schema_transform_literalizer(
102 conn_options,
103 source,
104 None,
105 None,
106 Arc::new(DefaultTransform {}),
107 Arc::new(DefaultLiteralizer {}),
108 )
109 .await
110 }
111
112 pub async fn try_new_with_schema(
113 conn_options: impl Into<ConnectionOptions>,
114 source: impl Into<RemoteSource>,
115 table_schema: SchemaRef,
116 ) -> DFResult<Self> {
117 Self::try_new_with_schema_transform_literalizer(
118 conn_options,
119 source,
120 Some(table_schema),
121 None,
122 Arc::new(DefaultTransform {}),
123 Arc::new(DefaultLiteralizer {}),
124 )
125 .await
126 }
127
128 pub async fn try_new_with_remote_schema(
129 conn_options: impl Into<ConnectionOptions>,
130 source: impl Into<RemoteSource>,
131 remote_schema: RemoteSchemaRef,
132 ) -> DFResult<Self> {
133 Self::try_new_with_schema_transform_literalizer(
134 conn_options,
135 source,
136 None,
137 Some(remote_schema),
138 Arc::new(DefaultTransform {}),
139 Arc::new(DefaultLiteralizer {}),
140 )
141 .await
142 }
143
144 pub async fn try_new_with_transform(
145 conn_options: impl Into<ConnectionOptions>,
146 source: impl Into<RemoteSource>,
147 transform: Arc<dyn Transform>,
148 ) -> DFResult<Self> {
149 Self::try_new_with_schema_transform_literalizer(
150 conn_options,
151 source,
152 None,
153 None,
154 transform,
155 Arc::new(DefaultLiteralizer {}),
156 )
157 .await
158 }
159
160 pub async fn try_new_with_schema_transform(
161 conn_options: impl Into<ConnectionOptions>,
162 source: impl Into<RemoteSource>,
163 table_schema: SchemaRef,
164 transform: Arc<dyn Transform>,
165 ) -> DFResult<Self> {
166 Self::try_new_with_schema_transform_literalizer(
167 conn_options,
168 source,
169 Some(table_schema),
170 None,
171 transform,
172 Arc::new(DefaultLiteralizer {}),
173 )
174 .await
175 }
176
177 pub async fn try_new_with_remote_schema_transform(
178 conn_options: impl Into<ConnectionOptions>,
179 source: impl Into<RemoteSource>,
180 remote_schema: RemoteSchemaRef,
181 transform: Arc<dyn Transform>,
182 ) -> DFResult<Self> {
183 Self::try_new_with_schema_transform_literalizer(
184 conn_options,
185 source,
186 None,
187 Some(remote_schema),
188 transform,
189 Arc::new(DefaultLiteralizer {}),
190 )
191 .await
192 }
193
194 pub async fn try_new_with_schema_transform_literalizer(
195 conn_options: impl Into<ConnectionOptions>,
196 source: impl Into<RemoteSource>,
197 table_schema: Option<SchemaRef>,
198 remote_schema: Option<RemoteSchemaRef>,
199 transform: Arc<dyn Transform>,
200 literalizer: Arc<dyn Literalize>,
201 ) -> DFResult<Self> {
202 let conn_options = conn_options.into();
203 let source = source.into();
204
205 if let RemoteSource::Table(table) = &source
206 && table.is_empty()
207 {
208 return Err(DataFusionError::Plan(
209 "Table source is empty vec".to_string(),
210 ));
211 }
212
213 let now = std::time::Instant::now();
214 let pool = connect(&conn_options).await?;
215 debug!(
216 "[remote-table] Creating connection pool cost: {}ms",
217 now.elapsed().as_millis()
218 );
219
220 let infer_schema_fn =
221 async |pool: &Arc<dyn Pool>, source: &RemoteSource| -> DFResult<RemoteSchemaRef> {
222 let now = std::time::Instant::now();
223 let conn = pool.get().await?;
224 let remote_schema = conn.infer_schema(source).await?;
225 debug!(
226 "[remote-table] Inferring remote schema cost: {}ms",
227 now.elapsed().as_millis()
228 );
229 Ok(remote_schema)
230 };
231
232 let (table_schema, remote_schema_opt): (SchemaRef, Option<RemoteSchemaRef>) =
233 match (table_schema, remote_schema) {
234 (Some(table_schema), Some(remote_schema)) => (table_schema, Some(remote_schema)),
235 (Some(table_schema), None) => {
236 let remote_schema = if transform.as_any().is::<DefaultTransform>()
237 && matches!(source, RemoteSource::Query(_))
238 {
239 None
240 } else {
241 let remote_schema = infer_schema_fn(&pool, &source).await?;
243 Some(remote_schema)
244 };
245 (table_schema, remote_schema)
246 }
247 (None, Some(remote_schema)) => (
248 Arc::new(remote_schema.to_arrow_schema()),
249 Some(remote_schema),
250 ),
251 (None, None) => {
252 let remote_schema = infer_schema_fn(&pool, &source).await?;
254 let inferred_table_schema = Arc::new(remote_schema.to_arrow_schema());
255 (inferred_table_schema, Some(remote_schema))
256 }
257 };
258
259 if let Some(remote_schema) = &remote_schema_opt
260 && table_schema.fields.len() != remote_schema.fields.len()
261 {
262 return Err(DataFusionError::Plan(format!(
263 "fields length of table schema is not matched with remote schema. table schema: {table_schema}, remote schema: {remote_schema:?}"
264 )));
265 }
266
267 let transformed_table_schema = transform_schema(
268 transform.as_ref(),
269 table_schema.clone(),
270 remote_schema_opt.as_ref(),
271 conn_options.db_type(),
272 )?;
273
274 Ok(RemoteTable {
275 conn_options: Arc::new(conn_options),
276 source,
277 table_schema,
278 transformed_table_schema,
279 remote_schema: remote_schema_opt,
280 transform,
281 literalizer,
282 pool,
283 })
284 }
285
286 pub fn remote_schema(&self) -> Option<RemoteSchemaRef> {
287 self.remote_schema.clone()
288 }
289}
290
291#[async_trait::async_trait]
292impl TableProvider for RemoteTable {
293 fn as_any(&self) -> &dyn Any {
294 self
295 }
296
297 fn schema(&self) -> SchemaRef {
298 self.transformed_table_schema.clone()
299 }
300
301 fn table_type(&self) -> TableType {
302 TableType::Base
303 }
304
305 async fn scan(
306 &self,
307 _state: &dyn Session,
308 projection: Option<&Vec<usize>>,
309 filters: &[Expr],
310 limit: Option<usize>,
311 ) -> DFResult<Arc<dyn ExecutionPlan>> {
312 let remote_schema = if self.transform.as_any().is::<DefaultTransform>() {
313 Arc::new(RemoteSchema::empty())
314 } else {
315 let Some(remote_schema) = &self.remote_schema else {
316 return Err(DataFusionError::Plan(
317 "remote schema is none but transform is not DefaultTransform".to_string(),
318 ));
319 };
320 remote_schema.clone()
321 };
322 let mut unparsed_filters = vec![];
323 for filter in filters {
324 let args = TransformArgs {
325 db_type: self.conn_options.db_type(),
326 table_schema: &self.table_schema,
327 remote_schema: &remote_schema,
328 };
329 unparsed_filters.push(self.transform.unparse_filter(filter, args)?);
330 }
331
332 let now = std::time::Instant::now();
333 let conn = self.pool.get().await?;
334 debug!(
335 "[remote-table] Getting connection from pool cost: {}ms",
336 now.elapsed().as_millis()
337 );
338
339 Ok(Arc::new(RemoteTableScanExec::try_new(
340 self.conn_options.clone(),
341 self.source.clone(),
342 self.table_schema.clone(),
343 self.remote_schema.clone(),
344 projection.cloned(),
345 unparsed_filters,
346 limit,
347 self.transform.clone(),
348 conn,
349 )?))
350 }
351
352 fn supports_filters_pushdown(
353 &self,
354 filters: &[&Expr],
355 ) -> DFResult<Vec<TableProviderFilterPushDown>> {
356 let db_type = self.conn_options.db_type();
357 if !db_type.support_rewrite_with_filters_limit(&self.source) {
358 return Ok(vec![
359 TableProviderFilterPushDown::Unsupported;
360 filters.len()
361 ]);
362 }
363
364 let remote_schema = if self.transform.as_any().is::<DefaultTransform>() {
365 Arc::new(RemoteSchema::empty())
366 } else {
367 let Some(remote_schema) = &self.remote_schema else {
368 return Err(DataFusionError::Plan(
369 "remote schema is none but transform is not DefaultTransform".to_string(),
370 ));
371 };
372 remote_schema.clone()
373 };
374
375 let mut pushdown = vec![];
376 for filter in filters {
377 let args = TransformArgs {
378 db_type: self.conn_options.db_type(),
379 table_schema: &self.table_schema,
380 remote_schema: &remote_schema,
381 };
382 pushdown.push(self.transform.support_filter_pushdown(filter, args)?);
383 }
384 Ok(pushdown)
385 }
386
387 fn statistics(&self) -> Option<Statistics> {
388 let db_type = self.conn_options.db_type();
389 if let Some(count1_query) = db_type.try_count1_query(&self.source) {
390 let conn_options = self.conn_options.clone();
391 let row_count_result = tokio::task::block_in_place(|| {
392 tokio::runtime::Handle::current().block_on(async {
393 let pool = connect(&conn_options).await?;
394 let conn = pool.get().await?;
395 conn_options
396 .db_type()
397 .fetch_count(conn, &conn_options, &count1_query)
398 .await
399 })
400 });
401
402 match row_count_result {
403 Ok(row_count) => {
404 let column_stat =
405 Statistics::unknown_column(self.transformed_table_schema.as_ref());
406 Some(Statistics {
407 num_rows: Precision::Exact(row_count),
408 total_byte_size: Precision::Absent,
409 column_statistics: column_stat,
410 })
411 }
412 Err(e) => {
413 warn!("[remote-table] Failed to fetch table statistics: {e}");
414 None
415 }
416 }
417 } else {
418 debug!(
419 "[remote-table] Query can not be rewritten as count1 query: {}",
420 self.source
421 );
422 None
423 }
424 }
425
426 async fn insert_into(
427 &self,
428 _state: &dyn Session,
429 input: Arc<dyn ExecutionPlan>,
430 insert_op: InsertOp,
431 ) -> DFResult<Arc<dyn ExecutionPlan>> {
432 match insert_op {
433 InsertOp::Append => {}
434 InsertOp::Overwrite | InsertOp::Replace => {
435 return Err(DataFusionError::Execution(
436 "Only support append insert operation".to_string(),
437 ));
438 }
439 }
440
441 let remote_schema = self
442 .remote_schema
443 .as_ref()
444 .ok_or(DataFusionError::Execution(
445 "Remote schema is not available".to_string(),
446 ))?
447 .clone();
448
449 let RemoteSource::Table(table) = &self.source else {
450 return Err(DataFusionError::Execution(
451 "Only support insert operation for table".to_string(),
452 ));
453 };
454
455 let now = std::time::Instant::now();
456 let conn = self.pool.get().await?;
457 debug!(
458 "[remote-table] Getting connection from pool cost: {}ms",
459 now.elapsed().as_millis()
460 );
461
462 let exec = RemoteTableInsertExec::new(
463 input,
464 self.conn_options.clone(),
465 self.literalizer.clone(),
466 table.clone(),
467 remote_schema,
468 conn,
469 );
470 Ok(Arc::new(exec))
471 }
472}