datafusion_remote_table/
table.rs

1use crate::{
2    ConnectionOptions, DFResult, DefaultLiteralizer, DefaultTransform, Literalize, Pool,
3    RemoteDbType, RemoteSchema, RemoteSchemaRef, RemoteTableInsertExec, RemoteTableScanExec,
4    Transform, TransformArgs, connect, transform_schema,
5};
6use datafusion::arrow::datatypes::SchemaRef;
7use datafusion::catalog::{Session, TableProvider};
8use datafusion::common::Statistics;
9use datafusion::common::stats::Precision;
10use datafusion::datasource::TableType;
11use datafusion::error::DataFusionError;
12use datafusion::logical_expr::dml::InsertOp;
13use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
14use datafusion::physical_plan::ExecutionPlan;
15use log::{debug, warn};
16use std::any::Any;
17use std::sync::Arc;
18
19#[derive(Debug, Clone)]
20pub enum RemoteSource {
21    Query(String),
22    Table(Vec<String>),
23}
24
25impl RemoteSource {
26    pub fn query(&self, db_type: RemoteDbType) -> String {
27        match self {
28            RemoteSource::Query(query) => query.clone(),
29            RemoteSource::Table(table_identifiers) => db_type.select_all_query(table_identifiers),
30        }
31    }
32}
33
34impl std::fmt::Display for RemoteSource {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        match self {
37            RemoteSource::Query(query) => write!(f, "{query}"),
38            RemoteSource::Table(table) => write!(f, "{}", table.join(".")),
39        }
40    }
41}
42
43impl From<String> for RemoteSource {
44    fn from(query: String) -> Self {
45        RemoteSource::Query(query)
46    }
47}
48
49impl From<&String> for RemoteSource {
50    fn from(query: &String) -> Self {
51        RemoteSource::Query(query.clone())
52    }
53}
54
55impl From<&str> for RemoteSource {
56    fn from(query: &str) -> Self {
57        RemoteSource::Query(query.to_string())
58    }
59}
60
61impl From<Vec<String>> for RemoteSource {
62    fn from(table_identifiers: Vec<String>) -> Self {
63        RemoteSource::Table(table_identifiers)
64    }
65}
66
67impl From<Vec<&str>> for RemoteSource {
68    fn from(table_identifiers: Vec<&str>) -> Self {
69        RemoteSource::Table(
70            table_identifiers
71                .into_iter()
72                .map(|s| s.to_string())
73                .collect(),
74        )
75    }
76}
77
78impl From<Vec<&String>> for RemoteSource {
79    fn from(table_identifiers: Vec<&String>) -> Self {
80        RemoteSource::Table(table_identifiers.into_iter().cloned().collect())
81    }
82}
83
84#[derive(Debug)]
85pub struct RemoteTable {
86    pub(crate) conn_options: Arc<ConnectionOptions>,
87    pub(crate) source: RemoteSource,
88    pub(crate) table_schema: SchemaRef,
89    pub(crate) transformed_table_schema: SchemaRef,
90    pub(crate) remote_schema: Option<RemoteSchemaRef>,
91    pub(crate) transform: Arc<dyn Transform>,
92    pub(crate) literalizer: Arc<dyn Literalize>,
93    pub(crate) pool: Arc<dyn Pool>,
94}
95
96impl RemoteTable {
97    pub async fn try_new(
98        conn_options: impl Into<ConnectionOptions>,
99        source: impl Into<RemoteSource>,
100    ) -> DFResult<Self> {
101        Self::try_new_with_schema_transform_literalizer(
102            conn_options,
103            source,
104            None,
105            None,
106            Arc::new(DefaultTransform {}),
107            Arc::new(DefaultLiteralizer {}),
108        )
109        .await
110    }
111
112    pub async fn try_new_with_schema(
113        conn_options: impl Into<ConnectionOptions>,
114        source: impl Into<RemoteSource>,
115        table_schema: SchemaRef,
116    ) -> DFResult<Self> {
117        Self::try_new_with_schema_transform_literalizer(
118            conn_options,
119            source,
120            Some(table_schema),
121            None,
122            Arc::new(DefaultTransform {}),
123            Arc::new(DefaultLiteralizer {}),
124        )
125        .await
126    }
127
128    pub async fn try_new_with_remote_schema(
129        conn_options: impl Into<ConnectionOptions>,
130        source: impl Into<RemoteSource>,
131        remote_schema: RemoteSchemaRef,
132    ) -> DFResult<Self> {
133        Self::try_new_with_schema_transform_literalizer(
134            conn_options,
135            source,
136            None,
137            Some(remote_schema),
138            Arc::new(DefaultTransform {}),
139            Arc::new(DefaultLiteralizer {}),
140        )
141        .await
142    }
143
144    pub async fn try_new_with_transform(
145        conn_options: impl Into<ConnectionOptions>,
146        source: impl Into<RemoteSource>,
147        transform: Arc<dyn Transform>,
148    ) -> DFResult<Self> {
149        Self::try_new_with_schema_transform_literalizer(
150            conn_options,
151            source,
152            None,
153            None,
154            transform,
155            Arc::new(DefaultLiteralizer {}),
156        )
157        .await
158    }
159
160    pub async fn try_new_with_schema_transform(
161        conn_options: impl Into<ConnectionOptions>,
162        source: impl Into<RemoteSource>,
163        table_schema: SchemaRef,
164        transform: Arc<dyn Transform>,
165    ) -> DFResult<Self> {
166        Self::try_new_with_schema_transform_literalizer(
167            conn_options,
168            source,
169            Some(table_schema),
170            None,
171            transform,
172            Arc::new(DefaultLiteralizer {}),
173        )
174        .await
175    }
176
177    pub async fn try_new_with_remote_schema_transform(
178        conn_options: impl Into<ConnectionOptions>,
179        source: impl Into<RemoteSource>,
180        remote_schema: RemoteSchemaRef,
181        transform: Arc<dyn Transform>,
182    ) -> DFResult<Self> {
183        Self::try_new_with_schema_transform_literalizer(
184            conn_options,
185            source,
186            None,
187            Some(remote_schema),
188            transform,
189            Arc::new(DefaultLiteralizer {}),
190        )
191        .await
192    }
193
194    pub async fn try_new_with_schema_transform_literalizer(
195        conn_options: impl Into<ConnectionOptions>,
196        source: impl Into<RemoteSource>,
197        table_schema: Option<SchemaRef>,
198        remote_schema: Option<RemoteSchemaRef>,
199        transform: Arc<dyn Transform>,
200        literalizer: Arc<dyn Literalize>,
201    ) -> DFResult<Self> {
202        let conn_options = conn_options.into();
203        let source = source.into();
204
205        if let RemoteSource::Table(table) = &source
206            && table.is_empty()
207        {
208            return Err(DataFusionError::Plan(
209                "Table source is empty vec".to_string(),
210            ));
211        }
212
213        let now = std::time::Instant::now();
214        let pool = connect(&conn_options).await?;
215        debug!(
216            "[remote-table] Creating connection pool cost: {}ms",
217            now.elapsed().as_millis()
218        );
219
220        let infer_schema_fn =
221            async |pool: &Arc<dyn Pool>, source: &RemoteSource| -> DFResult<RemoteSchemaRef> {
222                let now = std::time::Instant::now();
223                let conn = pool.get().await?;
224                let remote_schema = conn.infer_schema(source).await?;
225                debug!(
226                    "[remote-table] Inferring remote schema cost: {}ms",
227                    now.elapsed().as_millis()
228                );
229                Ok(remote_schema)
230            };
231
232        let (table_schema, remote_schema_opt): (SchemaRef, Option<RemoteSchemaRef>) =
233            match (table_schema, remote_schema) {
234                (Some(table_schema), Some(remote_schema)) => (table_schema, Some(remote_schema)),
235                (Some(table_schema), None) => {
236                    let remote_schema = if transform.as_any().is::<DefaultTransform>()
237                        && matches!(source, RemoteSource::Query(_))
238                    {
239                        None
240                    } else {
241                        // Infer remote schema
242                        let remote_schema = infer_schema_fn(&pool, &source).await?;
243                        Some(remote_schema)
244                    };
245                    (table_schema, remote_schema)
246                }
247                (None, Some(remote_schema)) => (
248                    Arc::new(remote_schema.to_arrow_schema()),
249                    Some(remote_schema),
250                ),
251                (None, None) => {
252                    // Infer table schema
253                    let remote_schema = infer_schema_fn(&pool, &source).await?;
254                    let inferred_table_schema = Arc::new(remote_schema.to_arrow_schema());
255                    (inferred_table_schema, Some(remote_schema))
256                }
257            };
258
259        if let Some(remote_schema) = &remote_schema_opt
260            && table_schema.fields.len() != remote_schema.fields.len()
261        {
262            return Err(DataFusionError::Plan(format!(
263                "fields length of table schema is not matched with remote schema. table schema: {table_schema}, remote schema: {remote_schema:?}"
264            )));
265        }
266
267        let transformed_table_schema = transform_schema(
268            transform.as_ref(),
269            table_schema.clone(),
270            remote_schema_opt.as_ref(),
271            conn_options.db_type(),
272        )?;
273
274        Ok(RemoteTable {
275            conn_options: Arc::new(conn_options),
276            source,
277            table_schema,
278            transformed_table_schema,
279            remote_schema: remote_schema_opt,
280            transform,
281            literalizer,
282            pool,
283        })
284    }
285
286    pub fn remote_schema(&self) -> Option<RemoteSchemaRef> {
287        self.remote_schema.clone()
288    }
289
290    pub fn pool(&self) -> &Arc<dyn Pool> {
291        &self.pool
292    }
293}
294
295#[async_trait::async_trait]
296impl TableProvider for RemoteTable {
297    fn as_any(&self) -> &dyn Any {
298        self
299    }
300
301    fn schema(&self) -> SchemaRef {
302        self.transformed_table_schema.clone()
303    }
304
305    fn table_type(&self) -> TableType {
306        TableType::Base
307    }
308
309    async fn scan(
310        &self,
311        _state: &dyn Session,
312        projection: Option<&Vec<usize>>,
313        filters: &[Expr],
314        limit: Option<usize>,
315    ) -> DFResult<Arc<dyn ExecutionPlan>> {
316        let remote_schema = if self.transform.as_any().is::<DefaultTransform>() {
317            Arc::new(RemoteSchema::empty())
318        } else {
319            let Some(remote_schema) = &self.remote_schema else {
320                return Err(DataFusionError::Plan(
321                    "remote schema is none but transform is not DefaultTransform".to_string(),
322                ));
323            };
324            remote_schema.clone()
325        };
326        let mut unparsed_filters = vec![];
327        for filter in filters {
328            let args = TransformArgs {
329                db_type: self.conn_options.db_type(),
330                table_schema: &self.table_schema,
331                remote_schema: &remote_schema,
332            };
333            unparsed_filters.push(self.transform.unparse_filter(filter, args)?);
334        }
335
336        let now = std::time::Instant::now();
337        let conn = self.pool.get().await?;
338        debug!(
339            "[remote-table] Getting connection from pool cost: {}ms",
340            now.elapsed().as_millis()
341        );
342
343        Ok(Arc::new(RemoteTableScanExec::try_new(
344            self.conn_options.clone(),
345            self.source.clone(),
346            self.table_schema.clone(),
347            self.remote_schema.clone(),
348            projection.cloned(),
349            unparsed_filters,
350            limit,
351            self.transform.clone(),
352            conn,
353        )?))
354    }
355
356    fn supports_filters_pushdown(
357        &self,
358        filters: &[&Expr],
359    ) -> DFResult<Vec<TableProviderFilterPushDown>> {
360        let db_type = self.conn_options.db_type();
361        if !db_type.support_rewrite_with_filters_limit(&self.source) {
362            return Ok(vec![
363                TableProviderFilterPushDown::Unsupported;
364                filters.len()
365            ]);
366        }
367
368        let remote_schema = if self.transform.as_any().is::<DefaultTransform>() {
369            Arc::new(RemoteSchema::empty())
370        } else {
371            let Some(remote_schema) = &self.remote_schema else {
372                return Err(DataFusionError::Plan(
373                    "remote schema is none but transform is not DefaultTransform".to_string(),
374                ));
375            };
376            remote_schema.clone()
377        };
378
379        let mut pushdown = vec![];
380        for filter in filters {
381            let args = TransformArgs {
382                db_type: self.conn_options.db_type(),
383                table_schema: &self.table_schema,
384                remote_schema: &remote_schema,
385            };
386            pushdown.push(self.transform.support_filter_pushdown(filter, args)?);
387        }
388        Ok(pushdown)
389    }
390
391    fn statistics(&self) -> Option<Statistics> {
392        let db_type = self.conn_options.db_type();
393        if let Some(count1_query) = db_type.try_count1_query(&self.source) {
394            let conn_options = self.conn_options.clone();
395            let row_count_result = tokio::task::block_in_place(|| {
396                tokio::runtime::Handle::current().block_on(async {
397                    let pool = connect(&conn_options).await?;
398                    let conn = pool.get().await?;
399                    conn_options
400                        .db_type()
401                        .fetch_count(conn, &conn_options, &count1_query)
402                        .await
403                })
404            });
405
406            match row_count_result {
407                Ok(row_count) => {
408                    let column_stat =
409                        Statistics::unknown_column(self.transformed_table_schema.as_ref());
410                    Some(Statistics {
411                        num_rows: Precision::Exact(row_count),
412                        total_byte_size: Precision::Absent,
413                        column_statistics: column_stat,
414                    })
415                }
416                Err(e) => {
417                    warn!("[remote-table] Failed to fetch table statistics: {e}");
418                    None
419                }
420            }
421        } else {
422            debug!(
423                "[remote-table] Query can not be rewritten as count1 query: {}",
424                self.source
425            );
426            None
427        }
428    }
429
430    async fn insert_into(
431        &self,
432        _state: &dyn Session,
433        input: Arc<dyn ExecutionPlan>,
434        insert_op: InsertOp,
435    ) -> DFResult<Arc<dyn ExecutionPlan>> {
436        match insert_op {
437            InsertOp::Append => {}
438            InsertOp::Overwrite | InsertOp::Replace => {
439                return Err(DataFusionError::Execution(
440                    "Only support append insert operation".to_string(),
441                ));
442            }
443        }
444
445        let remote_schema = self
446            .remote_schema
447            .as_ref()
448            .ok_or(DataFusionError::Execution(
449                "Remote schema is not available".to_string(),
450            ))?
451            .clone();
452
453        let RemoteSource::Table(table) = &self.source else {
454            return Err(DataFusionError::Execution(
455                "Only support insert operation for table".to_string(),
456            ));
457        };
458
459        let now = std::time::Instant::now();
460        let conn = self.pool.get().await?;
461        debug!(
462            "[remote-table] Getting connection from pool cost: {}ms",
463            now.elapsed().as_millis()
464        );
465
466        let exec = RemoteTableInsertExec::new(
467            input,
468            self.conn_options.clone(),
469            self.literalizer.clone(),
470            table.clone(),
471            remote_schema,
472            conn,
473        );
474        Ok(Arc::new(exec))
475    }
476}