datafusion_remote_table/
table.rs

1use crate::{
2    connect, ConnectionOptions, DFResult, Pool, RemoteSchemaRef, RemoteTableExec, Transform,
3};
4use datafusion::arrow::datatypes::SchemaRef;
5use datafusion::catalog::{Session, TableProvider};
6use datafusion::datasource::TableType;
7use datafusion::error::DataFusionError;
8use datafusion::logical_expr::Expr;
9use datafusion::physical_plan::ExecutionPlan;
10use std::any::Any;
11use std::sync::Arc;
12
13#[derive(Debug)]
14pub struct RemoteTable {
15    pub(crate) conn_options: ConnectionOptions,
16    pub(crate) sql: String,
17    pub(crate) table_schema: SchemaRef,
18    pub(crate) remote_schema: Option<RemoteSchemaRef>,
19    pub(crate) transform: Option<Arc<dyn Transform>>,
20    pub(crate) pool: Arc<dyn Pool>,
21}
22
23impl RemoteTable {
24    pub async fn try_new(
25        conn_options: ConnectionOptions,
26        sql: impl Into<String>,
27    ) -> DFResult<Self> {
28        Self::try_new_inner(conn_options, sql, None, None).await
29    }
30
31    pub async fn new_with_schema(
32        conn_options: ConnectionOptions,
33        sql: impl Into<String>,
34        table_schema: SchemaRef,
35    ) -> Self {
36        Self::try_new_inner(conn_options, sql, Some(table_schema), None)
37            .await
38            .unwrap()
39    }
40
41    pub async fn try_new_with_transform(
42        conn_options: ConnectionOptions,
43        sql: impl Into<String>,
44        transform: Arc<dyn Transform>,
45    ) -> DFResult<Self> {
46        Self::try_new_inner(conn_options, sql, None, Some(transform)).await
47    }
48
49    pub(crate) async fn try_new_inner(
50        conn_options: ConnectionOptions,
51        sql: impl Into<String>,
52        table_schema: Option<SchemaRef>,
53        transform: Option<Arc<dyn Transform>>,
54    ) -> DFResult<Self> {
55        let sql = sql.into();
56        let pool = connect(&conn_options).await?;
57        let conn = pool.get().await?;
58        let (table_schema, remote_schema) = match conn.infer_schema(&sql, transform.clone()).await {
59            Ok((remote_schema, inferred_table_schema)) => (
60                table_schema.unwrap_or(inferred_table_schema),
61                Some(remote_schema),
62            ),
63            Err(e) => {
64                if let Some(table_schema) = table_schema {
65                    (table_schema, None)
66                } else {
67                    return Err(DataFusionError::Execution(format!(
68                        "Failed to infer schema: {e}"
69                    )));
70                }
71            }
72        };
73        Ok(RemoteTable {
74            conn_options,
75            sql,
76            table_schema,
77            remote_schema,
78            transform,
79            pool,
80        })
81    }
82
83    pub fn remote_schema(&self) -> Option<RemoteSchemaRef> {
84        self.remote_schema.clone()
85    }
86}
87
88#[async_trait::async_trait]
89impl TableProvider for RemoteTable {
90    fn as_any(&self) -> &dyn Any {
91        self
92    }
93
94    fn schema(&self) -> SchemaRef {
95        self.table_schema.clone()
96    }
97
98    fn table_type(&self) -> TableType {
99        TableType::View
100    }
101
102    async fn scan(
103        &self,
104        _state: &dyn Session,
105        projection: Option<&Vec<usize>>,
106        _filters: &[Expr],
107        _limit: Option<usize>,
108    ) -> DFResult<Arc<dyn ExecutionPlan>> {
109        Ok(Arc::new(RemoteTableExec::try_new(
110            self.conn_options.clone(),
111            self.sql.clone(),
112            self.table_schema.clone(),
113            self.remote_schema.clone(),
114            projection.cloned(),
115            self.transform.clone(),
116            self.pool.get().await?,
117        )?))
118    }
119}