datafusion_remote_table/connection/
mod.rs

1mod mysql;
2mod oracle;
3mod postgres;
4mod sqlite;
5
6pub use mysql::*;
7pub use oracle::*;
8pub use postgres::*;
9
10use crate::connection::sqlite::connect_sqlite;
11use crate::{DFResult, RemoteSchemaRef, Transform};
12use bigdecimal::ToPrimitive;
13use datafusion::arrow::datatypes::SchemaRef;
14use datafusion::execution::SendableRecordBatchStream;
15use std::fmt::Debug;
16use std::path::PathBuf;
17use std::sync::Arc;
18
19#[async_trait::async_trait]
20pub trait Pool: Debug + Send + Sync {
21    async fn get(&self) -> DFResult<Arc<dyn Connection>>;
22}
23
24#[async_trait::async_trait]
25pub trait Connection: Debug + Send + Sync {
26    // TODO could add limit 1 to query
27    async fn infer_schema(
28        &self,
29        sql: &str,
30        transform: Option<Arc<dyn Transform>>,
31    ) -> DFResult<(RemoteSchemaRef, SchemaRef)>;
32
33    async fn query(
34        &self,
35        sql: String,
36        table_schema: SchemaRef,
37        projection: Option<Vec<usize>>,
38    ) -> DFResult<SendableRecordBatchStream>;
39}
40
41pub async fn connect(options: &ConnectionOptions) -> DFResult<Arc<dyn Pool>> {
42    match options {
43        ConnectionOptions::Postgres(options) => {
44            let pool = connect_postgres(options).await?;
45            Ok(Arc::new(pool))
46        }
47        ConnectionOptions::Mysql(options) => {
48            let pool = connect_mysql(options)?;
49            Ok(Arc::new(pool))
50        }
51        ConnectionOptions::Oracle(options) => {
52            let pool = connect_oracle(options).await?;
53            Ok(Arc::new(pool))
54        }
55        ConnectionOptions::Sqlite(path) => {
56            let pool = connect_sqlite(path).await?;
57            Ok(Arc::new(pool))
58        }
59    }
60}
61
62#[derive(Debug, Clone)]
63pub enum ConnectionOptions {
64    Postgres(PostgresConnectionOptions),
65    Oracle(OracleConnectionOptions),
66    Mysql(MysqlConnectionOptions),
67    Sqlite(PathBuf),
68}
69
70pub(crate) fn projections_contains(projection: Option<&Vec<usize>>, col_idx: usize) -> bool {
71    match projection {
72        Some(p) => p.contains(&col_idx),
73        None => true,
74    }
75}
76
77fn big_decimal_to_i128(decimal: &bigdecimal::BigDecimal, scale: Option<u32>) -> Option<i128> {
78    let scale = scale.unwrap_or_else(|| {
79        decimal
80            .fractional_digit_count()
81            .try_into()
82            .unwrap_or_default()
83    });
84    (decimal * 10i128.pow(scale)).to_i128()
85}