datafusion_remote_table/
table.rs

1use crate::{
2    ConnectionOptions, DFResult, DefaultLiteralizer, DefaultTransform, Literalize, Pool,
3    RemoteDbType, RemoteSchema, RemoteSchemaRef, RemoteTableInsertExec, RemoteTableScanExec,
4    Transform, TransformArgs, connect, transform_schema,
5};
6use datafusion::arrow::datatypes::SchemaRef;
7use datafusion::catalog::{Session, TableProvider};
8use datafusion::common::Statistics;
9use datafusion::common::stats::Precision;
10use datafusion::datasource::TableType;
11use datafusion::error::DataFusionError;
12use datafusion::logical_expr::dml::InsertOp;
13use datafusion::logical_expr::{Expr, TableProviderFilterPushDown};
14use datafusion::physical_plan::ExecutionPlan;
15use log::{debug, warn};
16use std::any::Any;
17use std::sync::Arc;
18
19#[derive(Debug, Clone)]
20pub enum RemoteSource {
21    Query(String),
22    Table(Vec<String>),
23}
24
25impl RemoteSource {
26    pub fn query(&self, db_type: RemoteDbType) -> String {
27        match self {
28            RemoteSource::Query(query) => query.clone(),
29            RemoteSource::Table(table_identifiers) => db_type.select_all_query(table_identifiers),
30        }
31    }
32}
33
34impl std::fmt::Display for RemoteSource {
35    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
36        match self {
37            RemoteSource::Query(query) => write!(f, "{query}"),
38            RemoteSource::Table(table) => write!(f, "{}", table.join(".")),
39        }
40    }
41}
42
43impl From<String> for RemoteSource {
44    fn from(query: String) -> Self {
45        RemoteSource::Query(query)
46    }
47}
48
49impl From<&String> for RemoteSource {
50    fn from(query: &String) -> Self {
51        RemoteSource::Query(query.clone())
52    }
53}
54
55impl From<&str> for RemoteSource {
56    fn from(query: &str) -> Self {
57        RemoteSource::Query(query.to_string())
58    }
59}
60
61impl From<Vec<String>> for RemoteSource {
62    fn from(table_identifiers: Vec<String>) -> Self {
63        RemoteSource::Table(table_identifiers)
64    }
65}
66
67impl From<Vec<&str>> for RemoteSource {
68    fn from(table_identifiers: Vec<&str>) -> Self {
69        RemoteSource::Table(
70            table_identifiers
71                .into_iter()
72                .map(|s| s.to_string())
73                .collect(),
74        )
75    }
76}
77
78impl From<Vec<&String>> for RemoteSource {
79    fn from(table_identifiers: Vec<&String>) -> Self {
80        RemoteSource::Table(table_identifiers.into_iter().cloned().collect())
81    }
82}
83
84#[derive(Debug)]
85pub struct RemoteTable {
86    pub(crate) conn_options: Arc<ConnectionOptions>,
87    pub(crate) source: RemoteSource,
88    pub(crate) table_schema: SchemaRef,
89    pub(crate) transformed_table_schema: SchemaRef,
90    pub(crate) remote_schema: Option<RemoteSchemaRef>,
91    pub(crate) transform: Arc<dyn Transform>,
92    pub(crate) literalizer: Arc<dyn Literalize>,
93    pub(crate) pool: Arc<dyn Pool>,
94}
95
96impl RemoteTable {
97    pub async fn try_new(
98        conn_options: impl Into<ConnectionOptions>,
99        source: impl Into<RemoteSource>,
100    ) -> DFResult<Self> {
101        Self::try_new_with_schema_transform_literalizer(
102            conn_options,
103            source,
104            None,
105            None,
106            Arc::new(DefaultTransform {}),
107            Arc::new(DefaultLiteralizer {}),
108        )
109        .await
110    }
111
112    pub async fn try_new_with_schema(
113        conn_options: impl Into<ConnectionOptions>,
114        source: impl Into<RemoteSource>,
115        table_schema: SchemaRef,
116    ) -> DFResult<Self> {
117        Self::try_new_with_schema_transform_literalizer(
118            conn_options,
119            source,
120            Some(table_schema),
121            None,
122            Arc::new(DefaultTransform {}),
123            Arc::new(DefaultLiteralizer {}),
124        )
125        .await
126    }
127
128    pub async fn try_new_with_remote_schema(
129        conn_options: impl Into<ConnectionOptions>,
130        source: impl Into<RemoteSource>,
131        remote_schema: RemoteSchemaRef,
132    ) -> DFResult<Self> {
133        Self::try_new_with_schema_transform_literalizer(
134            conn_options,
135            source,
136            None,
137            Some(remote_schema),
138            Arc::new(DefaultTransform {}),
139            Arc::new(DefaultLiteralizer {}),
140        )
141        .await
142    }
143
144    pub async fn try_new_with_transform(
145        conn_options: impl Into<ConnectionOptions>,
146        source: impl Into<RemoteSource>,
147        transform: Arc<dyn Transform>,
148    ) -> DFResult<Self> {
149        Self::try_new_with_schema_transform_literalizer(
150            conn_options,
151            source,
152            None,
153            None,
154            transform,
155            Arc::new(DefaultLiteralizer {}),
156        )
157        .await
158    }
159
160    pub async fn try_new_with_schema_transform(
161        conn_options: impl Into<ConnectionOptions>,
162        source: impl Into<RemoteSource>,
163        table_schema: SchemaRef,
164        transform: Arc<dyn Transform>,
165    ) -> DFResult<Self> {
166        Self::try_new_with_schema_transform_literalizer(
167            conn_options,
168            source,
169            Some(table_schema),
170            None,
171            transform,
172            Arc::new(DefaultLiteralizer {}),
173        )
174        .await
175    }
176
177    pub async fn try_new_with_remote_schema_transform(
178        conn_options: impl Into<ConnectionOptions>,
179        source: impl Into<RemoteSource>,
180        remote_schema: RemoteSchemaRef,
181        transform: Arc<dyn Transform>,
182    ) -> DFResult<Self> {
183        Self::try_new_with_schema_transform_literalizer(
184            conn_options,
185            source,
186            None,
187            Some(remote_schema),
188            transform,
189            Arc::new(DefaultLiteralizer {}),
190        )
191        .await
192    }
193
194    pub async fn try_new_with_schema_transform_literalizer(
195        conn_options: impl Into<ConnectionOptions>,
196        source: impl Into<RemoteSource>,
197        table_schema: Option<SchemaRef>,
198        remote_schema: Option<RemoteSchemaRef>,
199        transform: Arc<dyn Transform>,
200        literalizer: Arc<dyn Literalize>,
201    ) -> DFResult<Self> {
202        let conn_options = conn_options.into();
203        let source = source.into();
204
205        if let RemoteSource::Table(table) = &source
206            && table.is_empty()
207        {
208            return Err(DataFusionError::Plan(
209                "Table source is empty vec".to_string(),
210            ));
211        }
212
213        let now = std::time::Instant::now();
214        let pool = connect(&conn_options).await?;
215        debug!(
216            "[remote-table] Creating connection pool cost: {}ms",
217            now.elapsed().as_millis()
218        );
219
220        let infer_schema_fn =
221            async |pool: &Arc<dyn Pool>, source: &RemoteSource| -> DFResult<RemoteSchemaRef> {
222                let now = std::time::Instant::now();
223                let conn = pool.get().await?;
224                let remote_schema = conn.infer_schema(source).await?;
225                debug!(
226                    "[remote-table] Inferring remote schema cost: {}ms",
227                    now.elapsed().as_millis()
228                );
229                Ok(remote_schema)
230            };
231
232        let (table_schema, remote_schema_opt): (SchemaRef, Option<RemoteSchemaRef>) =
233            match (table_schema, remote_schema) {
234                (Some(table_schema), Some(remote_schema)) => (table_schema, Some(remote_schema)),
235                (Some(table_schema), None) => {
236                    let remote_schema = if transform.as_any().is::<DefaultTransform>()
237                        && matches!(source, RemoteSource::Query(_))
238                    {
239                        None
240                    } else {
241                        // Infer remote schema
242                        let remote_schema = infer_schema_fn(&pool, &source).await?;
243                        Some(remote_schema)
244                    };
245                    (table_schema, remote_schema)
246                }
247                (None, Some(remote_schema)) => (
248                    Arc::new(remote_schema.to_arrow_schema()),
249                    Some(remote_schema),
250                ),
251                (None, None) => {
252                    // Infer table schema
253                    let remote_schema = infer_schema_fn(&pool, &source).await?;
254                    let inferred_table_schema = Arc::new(remote_schema.to_arrow_schema());
255                    (inferred_table_schema, Some(remote_schema))
256                }
257            };
258
259        if let Some(remote_schema) = &remote_schema_opt
260            && table_schema.fields.len() != remote_schema.fields.len()
261        {
262            return Err(DataFusionError::Plan(format!(
263                "fields length of table schema is not matched with remote schema. table schema: {table_schema}, remote schema: {remote_schema:?}"
264            )));
265        }
266
267        let transformed_table_schema = transform_schema(
268            transform.as_ref(),
269            table_schema.clone(),
270            remote_schema_opt.as_ref(),
271            conn_options.db_type(),
272        )?;
273
274        Ok(RemoteTable {
275            conn_options: Arc::new(conn_options),
276            source,
277            table_schema,
278            transformed_table_schema,
279            remote_schema: remote_schema_opt,
280            transform,
281            literalizer,
282            pool,
283        })
284    }
285
286    pub fn remote_schema(&self) -> Option<RemoteSchemaRef> {
287        self.remote_schema.clone()
288    }
289}
290
291#[async_trait::async_trait]
292impl TableProvider for RemoteTable {
293    fn as_any(&self) -> &dyn Any {
294        self
295    }
296
297    fn schema(&self) -> SchemaRef {
298        self.transformed_table_schema.clone()
299    }
300
301    fn table_type(&self) -> TableType {
302        TableType::Base
303    }
304
305    async fn scan(
306        &self,
307        _state: &dyn Session,
308        projection: Option<&Vec<usize>>,
309        filters: &[Expr],
310        limit: Option<usize>,
311    ) -> DFResult<Arc<dyn ExecutionPlan>> {
312        let remote_schema = if self.transform.as_any().is::<DefaultTransform>() {
313            Arc::new(RemoteSchema::empty())
314        } else {
315            let Some(remote_schema) = &self.remote_schema else {
316                return Err(DataFusionError::Plan(
317                    "remote schema is none but transform is not DefaultTransform".to_string(),
318                ));
319            };
320            remote_schema.clone()
321        };
322        let mut unparsed_filters = vec![];
323        for filter in filters {
324            let args = TransformArgs {
325                db_type: self.conn_options.db_type(),
326                table_schema: &self.table_schema,
327                remote_schema: &remote_schema,
328            };
329            unparsed_filters.push(self.transform.unparse_filter(filter, args)?);
330        }
331
332        let now = std::time::Instant::now();
333        let conn = self.pool.get().await?;
334        debug!(
335            "[remote-table] Getting connection from pool cost: {}ms",
336            now.elapsed().as_millis()
337        );
338
339        Ok(Arc::new(RemoteTableScanExec::try_new(
340            self.conn_options.clone(),
341            self.source.clone(),
342            self.table_schema.clone(),
343            self.remote_schema.clone(),
344            projection.cloned(),
345            unparsed_filters,
346            limit,
347            self.transform.clone(),
348            conn,
349        )?))
350    }
351
352    fn supports_filters_pushdown(
353        &self,
354        filters: &[&Expr],
355    ) -> DFResult<Vec<TableProviderFilterPushDown>> {
356        let db_type = self.conn_options.db_type();
357        if !db_type.support_rewrite_with_filters_limit(&self.source) {
358            return Ok(vec![
359                TableProviderFilterPushDown::Unsupported;
360                filters.len()
361            ]);
362        }
363
364        let remote_schema = if self.transform.as_any().is::<DefaultTransform>() {
365            Arc::new(RemoteSchema::empty())
366        } else {
367            let Some(remote_schema) = &self.remote_schema else {
368                return Err(DataFusionError::Plan(
369                    "remote schema is none but transform is not DefaultTransform".to_string(),
370                ));
371            };
372            remote_schema.clone()
373        };
374
375        let mut pushdown = vec![];
376        for filter in filters {
377            let args = TransformArgs {
378                db_type: self.conn_options.db_type(),
379                table_schema: &self.table_schema,
380                remote_schema: &remote_schema,
381            };
382            pushdown.push(self.transform.support_filter_pushdown(filter, args)?);
383        }
384        Ok(pushdown)
385    }
386
387    fn statistics(&self) -> Option<Statistics> {
388        let db_type = self.conn_options.db_type();
389        if let Some(count1_query) = db_type.try_count1_query(&self.source) {
390            let conn_options = self.conn_options.clone();
391            let row_count_result = tokio::task::block_in_place(|| {
392                tokio::runtime::Handle::current().block_on(async {
393                    let pool = connect(&conn_options).await?;
394                    let conn = pool.get().await?;
395                    conn_options
396                        .db_type()
397                        .fetch_count(conn, &conn_options, &count1_query)
398                        .await
399                })
400            });
401
402            match row_count_result {
403                Ok(row_count) => {
404                    let column_stat =
405                        Statistics::unknown_column(self.transformed_table_schema.as_ref());
406                    Some(Statistics {
407                        num_rows: Precision::Exact(row_count),
408                        total_byte_size: Precision::Absent,
409                        column_statistics: column_stat,
410                    })
411                }
412                Err(e) => {
413                    warn!("[remote-table] Failed to fetch table statistics: {e}");
414                    None
415                }
416            }
417        } else {
418            debug!(
419                "[remote-table] Query can not be rewritten as count1 query: {}",
420                self.source
421            );
422            None
423        }
424    }
425
426    async fn insert_into(
427        &self,
428        _state: &dyn Session,
429        input: Arc<dyn ExecutionPlan>,
430        insert_op: InsertOp,
431    ) -> DFResult<Arc<dyn ExecutionPlan>> {
432        match insert_op {
433            InsertOp::Append => {}
434            InsertOp::Overwrite | InsertOp::Replace => {
435                return Err(DataFusionError::Execution(
436                    "Only support append insert operation".to_string(),
437                ));
438            }
439        }
440
441        let remote_schema = self
442            .remote_schema
443            .as_ref()
444            .ok_or(DataFusionError::Execution(
445                "Remote schema is not available".to_string(),
446            ))?
447            .clone();
448
449        let RemoteSource::Table(table) = &self.source else {
450            return Err(DataFusionError::Execution(
451                "Only support insert operation for table".to_string(),
452            ));
453        };
454
455        let now = std::time::Instant::now();
456        let conn = self.pool.get().await?;
457        debug!(
458            "[remote-table] Getting connection from pool cost: {}ms",
459            now.elapsed().as_millis()
460        );
461
462        let exec = RemoteTableInsertExec::new(
463            input,
464            self.conn_options.clone(),
465            self.literalizer.clone(),
466            table.clone(),
467            remote_schema,
468            conn,
469        );
470        Ok(Arc::new(exec))
471    }
472}