datafusion_remote_table/
table.rs

1use crate::{
2    ConnectionOptions, DFResult, Pool, RemoteSchemaRef, RemoteTableExec, Transform, connect,
3    transform_schema,
4};
5use datafusion::arrow::datatypes::SchemaRef;
6use datafusion::catalog::{Session, TableProvider};
7use datafusion::datasource::TableType;
8use datafusion::error::DataFusionError;
9use datafusion::logical_expr::Expr;
10use datafusion::physical_plan::ExecutionPlan;
11use std::any::Any;
12use std::sync::Arc;
13
14#[derive(Debug)]
15pub struct RemoteTable {
16    pub(crate) conn_options: ConnectionOptions,
17    pub(crate) sql: String,
18    pub(crate) table_schema: SchemaRef,
19    pub(crate) transformed_table_schema: SchemaRef,
20    pub(crate) remote_schema: Option<RemoteSchemaRef>,
21    pub(crate) transform: Option<Arc<dyn Transform>>,
22    pub(crate) pool: Arc<dyn Pool>,
23}
24
25impl RemoteTable {
26    pub async fn try_new(
27        conn_options: ConnectionOptions,
28        sql: impl Into<String>,
29    ) -> DFResult<Self> {
30        Self::try_new_with_schema_transform(conn_options, sql, None, None).await
31    }
32
33    pub async fn try_new_with_schema(
34        conn_options: ConnectionOptions,
35        sql: impl Into<String>,
36        table_schema: SchemaRef,
37    ) -> DFResult<Self> {
38        Self::try_new_with_schema_transform(conn_options, sql, Some(table_schema), None).await
39    }
40
41    pub async fn try_new_with_transform(
42        conn_options: ConnectionOptions,
43        sql: impl Into<String>,
44        transform: Arc<dyn Transform>,
45    ) -> DFResult<Self> {
46        Self::try_new_with_schema_transform(conn_options, sql, None, Some(transform)).await
47    }
48
49    pub async fn try_new_with_schema_transform(
50        conn_options: ConnectionOptions,
51        sql: impl Into<String>,
52        table_schema: Option<SchemaRef>,
53        transform: Option<Arc<dyn Transform>>,
54    ) -> DFResult<Self> {
55        let sql = sql.into();
56        let pool = connect(&conn_options).await?;
57        let conn = pool.get().await?;
58        let (table_schema, remote_schema) = match conn.infer_schema(&sql).await {
59            Ok((remote_schema, inferred_table_schema)) => (
60                table_schema.unwrap_or(inferred_table_schema),
61                Some(remote_schema),
62            ),
63            Err(e) => {
64                if let Some(table_schema) = table_schema {
65                    (table_schema, None)
66                } else {
67                    return Err(DataFusionError::Execution(format!(
68                        "Failed to infer schema: {e}"
69                    )));
70                }
71            }
72        };
73        let transformed_table_schema = if let Some(transform) = transform.as_ref() {
74            transform_schema(
75                table_schema.clone(),
76                transform.as_ref(),
77                remote_schema.as_ref(),
78            )?
79        } else {
80            table_schema.clone()
81        };
82        Ok(RemoteTable {
83            conn_options,
84            sql,
85            table_schema,
86            transformed_table_schema,
87            remote_schema,
88            transform,
89            pool,
90        })
91    }
92
93    pub fn remote_schema(&self) -> Option<RemoteSchemaRef> {
94        self.remote_schema.clone()
95    }
96}
97
98#[async_trait::async_trait]
99impl TableProvider for RemoteTable {
100    fn as_any(&self) -> &dyn Any {
101        self
102    }
103
104    fn schema(&self) -> SchemaRef {
105        self.transformed_table_schema.clone()
106    }
107
108    fn table_type(&self) -> TableType {
109        TableType::View
110    }
111
112    async fn scan(
113        &self,
114        _state: &dyn Session,
115        projection: Option<&Vec<usize>>,
116        _filters: &[Expr],
117        _limit: Option<usize>,
118    ) -> DFResult<Arc<dyn ExecutionPlan>> {
119        // TODO support limit pushdown
120        Ok(Arc::new(RemoteTableExec::try_new(
121            self.conn_options.clone(),
122            self.sql.clone(),
123            self.table_schema.clone(),
124            self.remote_schema.clone(),
125            projection.cloned(),
126            self.transform.clone(),
127            self.pool.get().await?,
128        )?))
129    }
130}