datafusion_remote_table/
table.rs1use crate::{
2 connect, ConnectionOptions, DFResult, Pool, RemoteSchemaRef, RemoteTableExec, Transform,
3};
4use datafusion::arrow::datatypes::SchemaRef;
5use datafusion::catalog::{Session, TableProvider};
6use datafusion::datasource::TableType;
7use datafusion::error::DataFusionError;
8use datafusion::logical_expr::Expr;
9use datafusion::physical_plan::ExecutionPlan;
10use std::any::Any;
11use std::sync::Arc;
12
13#[derive(Debug)]
14pub struct RemoteTable {
15 pub(crate) conn_options: ConnectionOptions,
16 pub(crate) sql: String,
17 pub(crate) table_schema: SchemaRef,
18 pub(crate) remote_schema: Option<RemoteSchemaRef>,
19 pub(crate) transform: Option<Arc<dyn Transform>>,
20 pub(crate) pool: Arc<dyn Pool>,
21}
22
23impl RemoteTable {
24 pub async fn try_new(
25 conn_options: ConnectionOptions,
26 sql: impl Into<String>,
27 ) -> DFResult<Self> {
28 Self::try_new_inner(conn_options, sql, None, None).await
29 }
30
31 pub async fn new_with_schema(
32 conn_options: ConnectionOptions,
33 sql: impl Into<String>,
34 table_schema: SchemaRef,
35 ) -> Self {
36 Self::try_new_inner(conn_options, sql, Some(table_schema), None)
37 .await
38 .unwrap()
39 }
40
41 pub async fn try_new_with_transform(
42 conn_options: ConnectionOptions,
43 sql: impl Into<String>,
44 transform: Arc<dyn Transform>,
45 ) -> DFResult<Self> {
46 Self::try_new_inner(conn_options, sql, None, Some(transform)).await
47 }
48
49 pub(crate) async fn try_new_inner(
50 conn_options: ConnectionOptions,
51 sql: impl Into<String>,
52 table_schema: Option<SchemaRef>,
53 transform: Option<Arc<dyn Transform>>,
54 ) -> DFResult<Self> {
55 let sql = sql.into();
56 let pool = connect(&conn_options).await?;
57 let conn = pool.get().await?;
58 let (table_schema, remote_schema) = match conn.infer_schema(&sql, transform.clone()).await {
59 Ok((remote_schema, inferred_table_schema)) => (
60 table_schema.unwrap_or(inferred_table_schema),
61 Some(remote_schema),
62 ),
63 Err(e) => {
64 if let Some(table_schema) = table_schema {
65 (table_schema, None)
66 } else {
67 return Err(DataFusionError::Execution(format!(
68 "Failed to infer schema: {e}"
69 )));
70 }
71 }
72 };
73 Ok(RemoteTable {
74 conn_options,
75 sql,
76 table_schema,
77 remote_schema,
78 transform,
79 pool,
80 })
81 }
82
83 pub fn remote_schema(&self) -> Option<RemoteSchemaRef> {
84 self.remote_schema.clone()
85 }
86}
87
88#[async_trait::async_trait]
89impl TableProvider for RemoteTable {
90 fn as_any(&self) -> &dyn Any {
91 self
92 }
93
94 fn schema(&self) -> SchemaRef {
95 self.table_schema.clone()
96 }
97
98 fn table_type(&self) -> TableType {
99 TableType::View
100 }
101
102 async fn scan(
103 &self,
104 _state: &dyn Session,
105 projection: Option<&Vec<usize>>,
106 _filters: &[Expr],
107 _limit: Option<usize>,
108 ) -> DFResult<Arc<dyn ExecutionPlan>> {
109 Ok(Arc::new(RemoteTableExec::try_new(
110 self.conn_options.clone(),
111 self.sql.clone(),
112 self.table_schema.clone(),
113 self.remote_schema.clone(),
114 projection.cloned(),
115 self.transform.clone(),
116 self.pool.get().await?,
117 )?))
118 }
119}