datafusion_remote_table/
table.rs

1use crate::{
2    connect, ConnectionOptions, DFResult, Pool, RemoteSchemaRef, RemoteTableExec, Transform,
3};
4use datafusion::arrow::datatypes::SchemaRef;
5use datafusion::catalog::{Session, TableProvider};
6use datafusion::datasource::TableType;
7use datafusion::logical_expr::Expr;
8use datafusion::physical_plan::ExecutionPlan;
9use std::any::Any;
10use std::sync::Arc;
11
12#[derive(Debug)]
13pub struct RemoteTable {
14    pub(crate) conn_options: ConnectionOptions,
15    pub(crate) sql: String,
16    pub(crate) table_schema: SchemaRef,
17    pub(crate) remote_schema: Option<RemoteSchemaRef>,
18    pub(crate) transform: Option<Arc<dyn Transform>>,
19    pub(crate) pool: Arc<dyn Pool>,
20}
21
22impl RemoteTable {
23    pub async fn try_new(
24        conn_options: ConnectionOptions,
25        sql: impl Into<String>,
26        table_schema: Option<SchemaRef>,
27        transform: Option<Arc<dyn Transform>>,
28    ) -> DFResult<Self> {
29        let sql = sql.into();
30        let pool = connect(&conn_options).await?;
31        let conn = pool.get().await?;
32        let (table_schema, remote_schema) = match conn.infer_schema(&sql, transform.clone()).await {
33            Ok((remote_schema, inferred_table_schema)) => (
34                table_schema.unwrap_or(inferred_table_schema),
35                Some(remote_schema),
36            ),
37            Err(e) => {
38                if let Some(table_schema) = table_schema {
39                    (table_schema, None)
40                } else {
41                    return Err(e);
42                }
43            }
44        };
45        Ok(RemoteTable {
46            conn_options,
47            sql,
48            table_schema,
49            remote_schema,
50            transform,
51            pool,
52        })
53    }
54
55    pub fn remote_schema(&self) -> Option<RemoteSchemaRef> {
56        self.remote_schema.clone()
57    }
58}
59
60#[async_trait::async_trait]
61impl TableProvider for RemoteTable {
62    fn as_any(&self) -> &dyn Any {
63        self
64    }
65
66    fn schema(&self) -> SchemaRef {
67        self.table_schema.clone()
68    }
69
70    fn table_type(&self) -> TableType {
71        TableType::View
72    }
73
74    async fn scan(
75        &self,
76        _state: &dyn Session,
77        projection: Option<&Vec<usize>>,
78        _filters: &[Expr],
79        _limit: Option<usize>,
80    ) -> DFResult<Arc<dyn ExecutionPlan>> {
81        Ok(Arc::new(RemoteTableExec::try_new(
82            self.conn_options.clone(),
83            self.sql.clone(),
84            self.table_schema.clone(),
85            self.remote_schema.clone(),
86            projection.cloned(),
87            self.transform.clone(),
88            self.pool.get().await?,
89        )?))
90    }
91}