datafusion_remote_table/
table.rs1use crate::{connect, ConnectionOptions, DFResult, Pool, RemoteSchema, RemoteTableExec, Transform};
2use datafusion::arrow::datatypes::SchemaRef;
3use datafusion::catalog::{Session, TableProvider};
4use datafusion::common::project_schema;
5use datafusion::datasource::TableType;
6use datafusion::logical_expr::Expr;
7use datafusion::physical_plan::ExecutionPlan;
8use std::any::Any;
9use std::sync::Arc;
10
11#[derive(Debug)]
12pub struct RemoteTable {
13 pub(crate) conn_options: ConnectionOptions,
14 pub(crate) sql: String,
15 pub(crate) remote_schema: RemoteSchema,
16 pub(crate) schema: SchemaRef,
17 pub(crate) transform: Option<Arc<dyn Transform>>,
18 pub(crate) pool: Arc<dyn Pool>,
19}
20
21impl RemoteTable {
22 pub async fn try_new(
23 conn_options: ConnectionOptions,
24 sql: impl Into<String>,
25 transform: Option<Arc<dyn Transform>>,
26 ) -> DFResult<Self> {
27 let sql = sql.into();
28 let pool = connect(&conn_options).await?;
29 let conn = pool.get().await?;
30 let (remote_schema, arrow_schema) = conn.infer_schema(&sql, transform.clone()).await?;
31 Ok(RemoteTable {
32 conn_options,
33 sql,
34 remote_schema,
35 schema: arrow_schema,
36 transform,
37 pool,
38 })
39 }
40
41 pub fn remote_schema(&self) -> &RemoteSchema {
42 &self.remote_schema
43 }
44}
45
46#[async_trait::async_trait]
47impl TableProvider for RemoteTable {
48 fn as_any(&self) -> &dyn Any {
49 self
50 }
51
52 fn schema(&self) -> SchemaRef {
53 self.schema.clone()
54 }
55
56 fn table_type(&self) -> TableType {
57 TableType::View
58 }
59
60 async fn scan(
61 &self,
62 _state: &dyn Session,
63 projection: Option<&Vec<usize>>,
64 _filters: &[Expr],
65 _limit: Option<usize>,
66 ) -> DFResult<Arc<dyn ExecutionPlan>> {
67 let projected_schema = project_schema(&self.schema, projection)?;
68 Ok(Arc::new(RemoteTableExec::new(
69 self.conn_options.clone(),
70 projected_schema,
71 self.sql.clone(),
72 projection.cloned(),
73 self.transform.clone(),
74 self.pool.get().await?,
75 )))
76 }
77}