datafusion_table_providers/
odbc.rs1use crate::sql::db_connection_pool::dbconnection::odbcconn::ODBCDbConnectionPool;
18use crate::sql::{
19 db_connection_pool as db_connection_pool_datafusion, sql_provider_datafusion::SqlTable,
20};
21use datafusion::arrow::datatypes::SchemaRef;
22use datafusion::error::DataFusionError;
23use datafusion::{datasource::TableProvider, sql::TableReference};
24use snafu::prelude::*;
25use std::sync::Arc;
26
27#[derive(Debug, Snafu)]
28pub enum Error {
29 #[snafu(display("DbConnectionError: {source}"))]
30 DbConnectionError {
31 source: db_connection_pool_datafusion::dbconnection::GenericError,
32 },
33 #[snafu(display("The table '{table_name}' doesn't exist in the Postgres server"))]
34 TableDoesntExist { table_name: String },
35
36 #[snafu(display("Unable to get a DB connection from the pool: {source}"))]
37 UnableToGetConnectionFromPool {
38 source: db_connection_pool_datafusion::Error,
39 },
40
41 #[snafu(display("Unable to get schema: {source}"))]
42 UnableToGetSchema {
43 source: db_connection_pool_datafusion::dbconnection::Error,
44 },
45
46 #[snafu(display("Unable to generate SQL: {source}"))]
47 UnableToGenerateSQL { source: DataFusionError },
48}
49
50type Result<T, E = Error> = std::result::Result<T, E>;
51
52pub struct ODBCTableFactory<'a> {
53 pool: Arc<ODBCDbConnectionPool<'a>>,
54}
55
56impl<'a> ODBCTableFactory<'a>
57where
58 'a: 'static,
59{
60 #[must_use]
61 pub fn new(pool: Arc<ODBCDbConnectionPool<'a>>) -> Self {
62 Self { pool }
63 }
64
65 pub async fn table_provider(
66 &self,
67 table_reference: TableReference,
68 _schema: Option<SchemaRef>,
69 ) -> Result<Arc<dyn TableProvider + 'static>, Box<dyn std::error::Error + Send + Sync>> {
70 let pool = Arc::clone(&self.pool);
71 let dyn_pool: Arc<ODBCDbConnectionPool<'a>> = pool;
72
73 let table = SqlTable::new("odbc", &dyn_pool, table_reference)
74 .await
75 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?;
76
77 let table_provider = Arc::new(table);
78
79 #[cfg(feature = "odbc-federation")]
80 let table_provider = Arc::new(
81 table_provider
82 .create_federated_table_provider()
83 .map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>)?,
84 );
85
86 Ok(table_provider)
87 }
88}