datafusion_remote_table/
table.rs1use crate::{
2 connect, ConnectionOptions, DFResult, Pool, RemoteSchemaRef, RemoteTableExec, Transform,
3};
4use datafusion::arrow::datatypes::SchemaRef;
5use datafusion::catalog::{Session, TableProvider};
6use datafusion::datasource::TableType;
7use datafusion::logical_expr::Expr;
8use datafusion::physical_plan::ExecutionPlan;
9use std::any::Any;
10use std::sync::Arc;
11
12#[derive(Debug)]
13pub struct RemoteTable {
14 pub(crate) conn_options: ConnectionOptions,
15 pub(crate) sql: String,
16 pub(crate) table_schema: SchemaRef,
17 pub(crate) remote_schema: Option<RemoteSchemaRef>,
18 pub(crate) transform: Option<Arc<dyn Transform>>,
19 pub(crate) pool: Arc<dyn Pool>,
20}
21
22impl RemoteTable {
23 pub async fn try_new(
24 conn_options: ConnectionOptions,
25 sql: impl Into<String>,
26 table_schema: Option<SchemaRef>,
27 transform: Option<Arc<dyn Transform>>,
28 ) -> DFResult<Self> {
29 let sql = sql.into();
30 let pool = connect(&conn_options).await?;
31 let conn = pool.get().await?;
32 let (table_schema, remote_schema) = match conn.infer_schema(&sql, transform.clone()).await {
33 Ok((remote_schema, inferred_table_schema)) => (
34 table_schema.unwrap_or(inferred_table_schema),
35 Some(remote_schema),
36 ),
37 Err(e) => {
38 if let Some(table_schema) = table_schema {
39 (table_schema, None)
40 } else {
41 return Err(e);
42 }
43 }
44 };
45 Ok(RemoteTable {
46 conn_options,
47 sql,
48 table_schema,
49 remote_schema,
50 transform,
51 pool,
52 })
53 }
54
55 pub fn remote_schema(&self) -> Option<RemoteSchemaRef> {
56 self.remote_schema.clone()
57 }
58}
59
60#[async_trait::async_trait]
61impl TableProvider for RemoteTable {
62 fn as_any(&self) -> &dyn Any {
63 self
64 }
65
66 fn schema(&self) -> SchemaRef {
67 self.table_schema.clone()
68 }
69
70 fn table_type(&self) -> TableType {
71 TableType::View
72 }
73
74 async fn scan(
75 &self,
76 _state: &dyn Session,
77 projection: Option<&Vec<usize>>,
78 _filters: &[Expr],
79 _limit: Option<usize>,
80 ) -> DFResult<Arc<dyn ExecutionPlan>> {
81 Ok(Arc::new(RemoteTableExec::try_new(
82 self.conn_options.clone(),
83 self.sql.clone(),
84 self.table_schema.clone(),
85 self.remote_schema.clone(),
86 projection.cloned(),
87 self.transform.clone(),
88 self.pool.get().await?,
89 )?))
90 }
91}