datafusion_remote_table/
table.rs

1use crate::{connect, ConnectionOptions, DFResult, Pool, RemoteSchema, RemoteTableExec, Transform};
2use datafusion::arrow::datatypes::SchemaRef;
3use datafusion::catalog::{Session, TableProvider};
4use datafusion::common::project_schema;
5use datafusion::datasource::TableType;
6use datafusion::logical_expr::Expr;
7use datafusion::physical_plan::ExecutionPlan;
8use std::any::Any;
9use std::sync::Arc;
10
11#[derive(Debug)]
12pub struct RemoteTable {
13    pub(crate) conn_options: ConnectionOptions,
14    pub(crate) sql: String,
15    pub(crate) remote_schema: RemoteSchema,
16    pub(crate) schema: SchemaRef,
17    pub(crate) transform: Option<Arc<dyn Transform>>,
18    pub(crate) pool: Arc<dyn Pool>,
19}
20
21impl RemoteTable {
22    pub async fn try_new(
23        conn_options: ConnectionOptions,
24        sql: impl Into<String>,
25        transform: Option<Arc<dyn Transform>>,
26    ) -> DFResult<Self> {
27        let sql = sql.into();
28        let pool = connect(&conn_options).await?;
29        let conn = pool.get().await?;
30        let (remote_schema, arrow_schema) = conn.infer_schema(&sql, transform.clone()).await?;
31        Ok(RemoteTable {
32            conn_options,
33            sql,
34            remote_schema,
35            schema: arrow_schema,
36            transform,
37            pool,
38        })
39    }
40
41    pub fn remote_schema(&self) -> &RemoteSchema {
42        &self.remote_schema
43    }
44}
45
46#[async_trait::async_trait]
47impl TableProvider for RemoteTable {
48    fn as_any(&self) -> &dyn Any {
49        self
50    }
51
52    fn schema(&self) -> SchemaRef {
53        self.schema.clone()
54    }
55
56    fn table_type(&self) -> TableType {
57        TableType::View
58    }
59
60    async fn scan(
61        &self,
62        _state: &dyn Session,
63        projection: Option<&Vec<usize>>,
64        _filters: &[Expr],
65        _limit: Option<usize>,
66    ) -> DFResult<Arc<dyn ExecutionPlan>> {
67        let projected_schema = project_schema(&self.schema, projection)?;
68        Ok(Arc::new(RemoteTableExec::new(
69            self.conn_options.clone(),
70            projected_schema,
71            self.sql.clone(),
72            projection.cloned(),
73            self.transform.clone(),
74            self.pool.get().await?,
75        )))
76    }
77}