datafusion_remote_table/
table.rs1use crate::{connect, ConnectionOptions, DFResult, Pool, RemoteTableExec, Transform};
2use datafusion::arrow::datatypes::SchemaRef;
3use datafusion::catalog::{Session, TableProvider};
4use datafusion::common::project_schema;
5use datafusion::datasource::TableType;
6use datafusion::logical_expr::Expr;
7use datafusion::physical_plan::ExecutionPlan;
8use std::any::Any;
9use std::sync::Arc;
10
11#[derive(Debug)]
12pub struct RemoteTable {
13 pub(crate) conn_options: ConnectionOptions,
14 pub(crate) sql: String,
15 pub(crate) schema: SchemaRef,
16 pub(crate) transform: Option<Arc<dyn Transform>>,
17 pub(crate) pool: Arc<dyn Pool>,
18}
19
20impl RemoteTable {
21 pub async fn try_new(
22 conn_options: ConnectionOptions,
23 sql: impl Into<String>,
24 transform: Option<Arc<dyn Transform>>,
25 ) -> DFResult<Self> {
26 let sql = sql.into();
27 let pool = connect(&conn_options).await?;
28 let conn = pool.get().await?;
29 let (_remote_schema, arrow_schema) = conn.infer_schema(&sql, transform.as_deref()).await?;
30 Ok(RemoteTable {
31 conn_options,
32 sql,
33 schema: arrow_schema,
34 transform,
35 pool,
36 })
37 }
38}
39
40#[async_trait::async_trait]
41impl TableProvider for RemoteTable {
42 fn as_any(&self) -> &dyn Any {
43 self
44 }
45
46 fn schema(&self) -> SchemaRef {
47 self.schema.clone()
48 }
49
50 fn table_type(&self) -> TableType {
51 TableType::View
52 }
53
54 async fn scan(
55 &self,
56 _state: &dyn Session,
57 projection: Option<&Vec<usize>>,
58 _filters: &[Expr],
59 _limit: Option<usize>,
60 ) -> DFResult<Arc<dyn ExecutionPlan>> {
61 let projected_schema = project_schema(&self.schema, projection)?;
62 Ok(Arc::new(RemoteTableExec::new(
63 self.conn_options.clone(),
64 projected_schema,
65 self.sql.clone(),
66 projection.cloned(),
67 self.transform.clone(),
68 self.pool.get().await?,
69 )))
70 }
71}