datafusion_remote_table/
table.rs

1use crate::{connect, ConnectionOptions, DFResult, Pool, RemoteTableExec, Transform};
2use datafusion::arrow::datatypes::SchemaRef;
3use datafusion::catalog::{Session, TableProvider};
4use datafusion::common::project_schema;
5use datafusion::datasource::TableType;
6use datafusion::logical_expr::Expr;
7use datafusion::physical_plan::ExecutionPlan;
8use std::any::Any;
9use std::sync::Arc;
10
11#[derive(Debug)]
12pub struct RemoteTable {
13    pub(crate) conn_options: ConnectionOptions,
14    pub(crate) sql: String,
15    pub(crate) schema: SchemaRef,
16    pub(crate) transform: Option<Arc<dyn Transform>>,
17    pub(crate) pool: Arc<dyn Pool>,
18}
19
20impl RemoteTable {
21    pub async fn try_new(
22        conn_options: ConnectionOptions,
23        sql: impl Into<String>,
24        transform: Option<Arc<dyn Transform>>,
25    ) -> DFResult<Self> {
26        let sql = sql.into();
27        let pool = connect(&conn_options).await?;
28        let conn = pool.get().await?;
29        let (_remote_schema, arrow_schema) = conn.infer_schema(&sql, transform.as_deref()).await?;
30        Ok(RemoteTable {
31            conn_options,
32            sql,
33            schema: arrow_schema,
34            transform,
35            pool,
36        })
37    }
38}
39
40#[async_trait::async_trait]
41impl TableProvider for RemoteTable {
42    fn as_any(&self) -> &dyn Any {
43        self
44    }
45
46    fn schema(&self) -> SchemaRef {
47        self.schema.clone()
48    }
49
50    fn table_type(&self) -> TableType {
51        TableType::View
52    }
53
54    async fn scan(
55        &self,
56        _state: &dyn Session,
57        projection: Option<&Vec<usize>>,
58        _filters: &[Expr],
59        _limit: Option<usize>,
60    ) -> DFResult<Arc<dyn ExecutionPlan>> {
61        let projected_schema = project_schema(&self.schema, projection)?;
62        Ok(Arc::new(RemoteTableExec::new(
63            self.conn_options.clone(),
64            projected_schema,
65            self.sql.clone(),
66            projection.cloned(),
67            self.transform.clone(),
68            self.pool.get().await?,
69        )))
70    }
71}