datafusion_remote_table/connection/
mod.rs

1mod mysql;
2mod oracle;
3mod postgres;
4mod sqlite;
5
6pub use mysql::*;
7pub use oracle::*;
8pub use postgres::*;
9
10use crate::connection::sqlite::connect_sqlite;
11use crate::{DFResult, RemoteSchemaRef, Transform};
12use bigdecimal::ToPrimitive;
13use datafusion::arrow::datatypes::SchemaRef;
14use datafusion::execution::SendableRecordBatchStream;
15use std::fmt::Debug;
16use std::path::PathBuf;
17use std::sync::Arc;
18
19#[async_trait::async_trait]
20pub trait Pool: Debug + Send + Sync {
21    async fn get(&self) -> DFResult<Arc<dyn Connection>>;
22}
23
24#[async_trait::async_trait]
25pub trait Connection: Debug + Send + Sync {
26    // TODO could add limit 1 to query
27    async fn infer_schema(
28        &self,
29        sql: &str,
30        transform: Option<Arc<dyn Transform>>,
31    ) -> DFResult<(RemoteSchemaRef, SchemaRef)>;
32
33    async fn query(
34        &self,
35        conn_options: &ConnectionOptions,
36        sql: &str,
37        table_schema: SchemaRef,
38        projection: Option<&Vec<usize>>,
39    ) -> DFResult<SendableRecordBatchStream>;
40}
41
42pub async fn connect(options: &ConnectionOptions) -> DFResult<Arc<dyn Pool>> {
43    match options {
44        ConnectionOptions::Postgres(options) => {
45            let pool = connect_postgres(options).await?;
46            Ok(Arc::new(pool))
47        }
48        ConnectionOptions::Mysql(options) => {
49            let pool = connect_mysql(options)?;
50            Ok(Arc::new(pool))
51        }
52        ConnectionOptions::Oracle(options) => {
53            let pool = connect_oracle(options).await?;
54            Ok(Arc::new(pool))
55        }
56        ConnectionOptions::Sqlite(path) => {
57            let pool = connect_sqlite(path).await?;
58            Ok(Arc::new(pool))
59        }
60    }
61}
62
63#[derive(Debug, Clone)]
64pub enum ConnectionOptions {
65    Postgres(PostgresConnectionOptions),
66    Oracle(OracleConnectionOptions),
67    Mysql(MysqlConnectionOptions),
68    Sqlite(PathBuf),
69}
70
71impl ConnectionOptions {
72    pub fn chunk_size(&self) -> Option<usize> {
73        match self {
74            ConnectionOptions::Postgres(options) => options.chunk_size,
75            ConnectionOptions::Oracle(options) => options.chunk_size,
76            ConnectionOptions::Mysql(options) => options.chunk_size,
77            ConnectionOptions::Sqlite(_) => None,
78        }
79    }
80}
81
82pub(crate) fn projections_contains(projection: Option<&Vec<usize>>, col_idx: usize) -> bool {
83    match projection {
84        Some(p) => p.contains(&col_idx),
85        None => true,
86    }
87}
88
89fn big_decimal_to_i128(decimal: &bigdecimal::BigDecimal, scale: Option<u32>) -> Option<i128> {
90    let scale = scale.unwrap_or_else(|| {
91        decimal
92            .fractional_digit_count()
93            .try_into()
94            .unwrap_or_default()
95    });
96    (decimal * 10i128.pow(scale)).to_i128()
97}