datafusion_remote_table/connection/
mod.rs

1#[cfg(feature = "mysql")]
2mod mysql;
3#[cfg(feature = "oracle")]
4mod oracle;
5#[cfg(feature = "postgres")]
6mod postgres;
7#[cfg(feature = "sqlite")]
8mod sqlite;
9
10#[cfg(feature = "mysql")]
11pub use mysql::*;
12#[cfg(feature = "oracle")]
13pub use oracle::*;
14#[cfg(feature = "postgres")]
15pub use postgres::*;
16#[cfg(feature = "sqlite")]
17pub use sqlite::*;
18
19use crate::{DFResult, RemoteSchemaRef};
20use datafusion::arrow::datatypes::SchemaRef;
21use datafusion::execution::SendableRecordBatchStream;
22use std::fmt::Debug;
23#[cfg(feature = "sqlite")]
24use std::path::PathBuf;
25use std::sync::Arc;
26
27#[async_trait::async_trait]
28pub trait Pool: Debug + Send + Sync {
29    async fn get(&self) -> DFResult<Arc<dyn Connection>>;
30}
31
32#[async_trait::async_trait]
33pub trait Connection: Debug + Send + Sync {
34    async fn infer_schema(&self, sql: &str) -> DFResult<(RemoteSchemaRef, SchemaRef)>;
35
36    async fn query(
37        &self,
38        conn_options: &ConnectionOptions,
39        sql: &str,
40        table_schema: SchemaRef,
41        projection: Option<&Vec<usize>>,
42        limit: Option<usize>,
43    ) -> DFResult<SendableRecordBatchStream>;
44}
45
46pub async fn connect(options: &ConnectionOptions) -> DFResult<Arc<dyn Pool>> {
47    match options {
48        #[cfg(feature = "postgres")]
49        ConnectionOptions::Postgres(options) => {
50            let pool = connect_postgres(options).await?;
51            Ok(Arc::new(pool))
52        }
53        #[cfg(feature = "mysql")]
54        ConnectionOptions::Mysql(options) => {
55            let pool = connect_mysql(options)?;
56            Ok(Arc::new(pool))
57        }
58        #[cfg(feature = "oracle")]
59        ConnectionOptions::Oracle(options) => {
60            let pool = connect_oracle(options).await?;
61            Ok(Arc::new(pool))
62        }
63        #[cfg(feature = "sqlite")]
64        ConnectionOptions::Sqlite(path) => {
65            let pool = connect_sqlite(path).await?;
66            Ok(Arc::new(pool))
67        }
68    }
69}
70
71#[derive(Debug, Clone)]
72pub enum ConnectionOptions {
73    #[cfg(feature = "postgres")]
74    Postgres(PostgresConnectionOptions),
75    #[cfg(feature = "oracle")]
76    Oracle(OracleConnectionOptions),
77    #[cfg(feature = "mysql")]
78    Mysql(MysqlConnectionOptions),
79    #[cfg(feature = "sqlite")]
80    Sqlite(PathBuf),
81}
82
83impl ConnectionOptions {
84    pub fn stream_chunk_size(&self) -> usize {
85        match self {
86            #[cfg(feature = "postgres")]
87            ConnectionOptions::Postgres(options) => options.stream_chunk_size,
88            #[cfg(feature = "oracle")]
89            ConnectionOptions::Oracle(options) => options.stream_chunk_size,
90            #[cfg(feature = "mysql")]
91            ConnectionOptions::Mysql(options) => options.stream_chunk_size,
92            #[cfg(feature = "sqlite")]
93            ConnectionOptions::Sqlite(_) => unreachable!(),
94        }
95    }
96}
97
98pub(crate) fn projections_contains(projection: Option<&Vec<usize>>, col_idx: usize) -> bool {
99    match projection {
100        Some(p) => p.contains(&col_idx),
101        None => true,
102    }
103}
104
105#[cfg(any(feature = "mysql", feature = "postgres", feature = "oracle"))]
106fn big_decimal_to_i128(decimal: &bigdecimal::BigDecimal, scale: Option<i32>) -> Option<i128> {
107    use bigdecimal::{FromPrimitive, ToPrimitive};
108    let scale = scale.unwrap_or_else(|| {
109        decimal
110            .fractional_digit_count()
111            .try_into()
112            .unwrap_or_default()
113    });
114    let scale_decimal = bigdecimal::BigDecimal::from_f32(10f32.powi(scale))?;
115    (decimal * scale_decimal).to_i128()
116}