datafusion_remote_table/
table.rs1use crate::{
2 ConnectionOptions, DFResult, Pool, RemoteSchemaRef, RemoteTableExec, Transform, connect,
3 transform_schema,
4};
5use datafusion::arrow::datatypes::SchemaRef;
6use datafusion::catalog::{Session, TableProvider};
7use datafusion::datasource::TableType;
8use datafusion::error::DataFusionError;
9use datafusion::logical_expr::Expr;
10use datafusion::physical_plan::ExecutionPlan;
11use std::any::Any;
12use std::sync::Arc;
13
14#[derive(Debug)]
15pub struct RemoteTable {
16 pub(crate) conn_options: ConnectionOptions,
17 pub(crate) sql: String,
18 pub(crate) table_schema: SchemaRef,
19 pub(crate) transformed_table_schema: SchemaRef,
20 pub(crate) remote_schema: Option<RemoteSchemaRef>,
21 pub(crate) transform: Option<Arc<dyn Transform>>,
22 pub(crate) pool: Arc<dyn Pool>,
23}
24
25impl RemoteTable {
26 pub async fn try_new(
27 conn_options: ConnectionOptions,
28 sql: impl Into<String>,
29 ) -> DFResult<Self> {
30 Self::try_new_with_schema_transform(conn_options, sql, None, None).await
31 }
32
33 pub async fn try_new_with_schema(
34 conn_options: ConnectionOptions,
35 sql: impl Into<String>,
36 table_schema: SchemaRef,
37 ) -> DFResult<Self> {
38 Self::try_new_with_schema_transform(conn_options, sql, Some(table_schema), None).await
39 }
40
41 pub async fn try_new_with_transform(
42 conn_options: ConnectionOptions,
43 sql: impl Into<String>,
44 transform: Arc<dyn Transform>,
45 ) -> DFResult<Self> {
46 Self::try_new_with_schema_transform(conn_options, sql, None, Some(transform)).await
47 }
48
49 pub async fn try_new_with_schema_transform(
50 conn_options: ConnectionOptions,
51 sql: impl Into<String>,
52 table_schema: Option<SchemaRef>,
53 transform: Option<Arc<dyn Transform>>,
54 ) -> DFResult<Self> {
55 let sql = sql.into();
56 let pool = connect(&conn_options).await?;
57 let conn = pool.get().await?;
58 let (table_schema, remote_schema) = match conn.infer_schema(&sql).await {
59 Ok((remote_schema, inferred_table_schema)) => (
60 table_schema.unwrap_or(inferred_table_schema),
61 Some(remote_schema),
62 ),
63 Err(e) => {
64 if let Some(table_schema) = table_schema {
65 (table_schema, None)
66 } else {
67 return Err(DataFusionError::Execution(format!(
68 "Failed to infer schema: {e}"
69 )));
70 }
71 }
72 };
73 let transformed_table_schema = if let Some(transform) = transform.as_ref() {
74 transform_schema(
75 table_schema.clone(),
76 transform.as_ref(),
77 remote_schema.as_ref(),
78 )?
79 } else {
80 table_schema.clone()
81 };
82 Ok(RemoteTable {
83 conn_options,
84 sql,
85 table_schema,
86 transformed_table_schema,
87 remote_schema,
88 transform,
89 pool,
90 })
91 }
92
93 pub fn remote_schema(&self) -> Option<RemoteSchemaRef> {
94 self.remote_schema.clone()
95 }
96}
97
98#[async_trait::async_trait]
99impl TableProvider for RemoteTable {
100 fn as_any(&self) -> &dyn Any {
101 self
102 }
103
104 fn schema(&self) -> SchemaRef {
105 self.transformed_table_schema.clone()
106 }
107
108 fn table_type(&self) -> TableType {
109 TableType::View
110 }
111
112 async fn scan(
113 &self,
114 _state: &dyn Session,
115 projection: Option<&Vec<usize>>,
116 _filters: &[Expr],
117 _limit: Option<usize>,
118 ) -> DFResult<Arc<dyn ExecutionPlan>> {
119 Ok(Arc::new(RemoteTableExec::try_new(
121 self.conn_options.clone(),
122 self.sql.clone(),
123 self.table_schema.clone(),
124 self.remote_schema.clone(),
125 projection.cloned(),
126 self.transform.clone(),
127 self.pool.get().await?,
128 )?))
129 }
130}