datafusion_remote_table/
table.rs

1use crate::{connect, ConnectionOptions, DFResult, RemoteTableExec, Transform};
2use datafusion::arrow::datatypes::SchemaRef;
3use datafusion::catalog::{Session, TableProvider};
4use datafusion::common::project_schema;
5use datafusion::datasource::TableType;
6use datafusion::logical_expr::Expr;
7use datafusion::physical_plan::ExecutionPlan;
8use std::any::Any;
9use std::sync::Arc;
10
11#[derive(Debug)]
12pub struct RemoteTable {
13    pub(crate) conn_options: ConnectionOptions,
14    pub(crate) sql: String,
15    pub(crate) schema: SchemaRef,
16    pub(crate) transform: Option<Arc<dyn Transform>>,
17}
18
19impl RemoteTable {
20    pub async fn try_new(
21        conn_options: ConnectionOptions,
22        sql: String,
23        transform: Option<Arc<dyn Transform>>,
24    ) -> DFResult<Self> {
25        let conn = connect(&conn_options).await?;
26        let (_remote_schema, arrow_schema) = conn.infer_schema(&sql, transform.as_deref()).await?;
27        Ok(RemoteTable {
28            conn_options,
29            sql,
30            schema: arrow_schema,
31            transform,
32        })
33    }
34}
35
36#[async_trait::async_trait]
37impl TableProvider for RemoteTable {
38    fn as_any(&self) -> &dyn Any {
39        self
40    }
41
42    fn schema(&self) -> SchemaRef {
43        self.schema.clone()
44    }
45
46    fn table_type(&self) -> TableType {
47        TableType::Temporary
48    }
49
50    async fn scan(
51        &self,
52        _state: &dyn Session,
53        projection: Option<&Vec<usize>>,
54        _filters: &[Expr],
55        _limit: Option<usize>,
56    ) -> DFResult<Arc<dyn ExecutionPlan>> {
57        let projected_schema = project_schema(&self.schema, projection)?;
58        Ok(Arc::new(
59            RemoteTableExec::try_new(
60                self.conn_options.clone(),
61                projected_schema,
62                self.sql.clone(),
63                projection.cloned(),
64                self.transform.clone(),
65            )
66            .await?,
67        ))
68    }
69}