datafusion_remote_table/
table.rs1use crate::{connect, ConnectionOptions, DFResult, RemoteTableExec, Transform};
2use datafusion::arrow::datatypes::SchemaRef;
3use datafusion::catalog::{Session, TableProvider};
4use datafusion::common::project_schema;
5use datafusion::datasource::TableType;
6use datafusion::logical_expr::Expr;
7use datafusion::physical_plan::ExecutionPlan;
8use std::any::Any;
9use std::sync::Arc;
10
11#[derive(Debug)]
12pub struct RemoteTable {
13 pub(crate) conn_options: ConnectionOptions,
14 pub(crate) sql: String,
15 pub(crate) schema: SchemaRef,
16 pub(crate) transform: Option<Arc<dyn Transform>>,
17}
18
19impl RemoteTable {
20 pub async fn try_new(
21 conn_options: ConnectionOptions,
22 sql: String,
23 transform: Option<Arc<dyn Transform>>,
24 ) -> DFResult<Self> {
25 let conn = connect(&conn_options).await?;
26 let (_remote_schema, arrow_schema) = conn.infer_schema(&sql, transform.as_deref()).await?;
27 Ok(RemoteTable {
28 conn_options,
29 sql,
30 schema: arrow_schema,
31 transform,
32 })
33 }
34}
35
36#[async_trait::async_trait]
37impl TableProvider for RemoteTable {
38 fn as_any(&self) -> &dyn Any {
39 self
40 }
41
42 fn schema(&self) -> SchemaRef {
43 self.schema.clone()
44 }
45
46 fn table_type(&self) -> TableType {
47 TableType::Temporary
48 }
49
50 async fn scan(
51 &self,
52 _state: &dyn Session,
53 projection: Option<&Vec<usize>>,
54 _filters: &[Expr],
55 _limit: Option<usize>,
56 ) -> DFResult<Arc<dyn ExecutionPlan>> {
57 let projected_schema = project_schema(&self.schema, projection)?;
58 Ok(Arc::new(
59 RemoteTableExec::try_new(
60 self.conn_options.clone(),
61 projected_schema,
62 self.sql.clone(),
63 projection.cloned(),
64 self.transform.clone(),
65 )
66 .await?,
67 ))
68 }
69}