datafusion_remote_table/connection/
mod.rs1mod mysql;
2mod oracle;
3mod postgres;
4mod sqlite;
5
6pub use mysql::*;
7pub use oracle::*;
8pub use postgres::*;
9
10use crate::connection::sqlite::connect_sqlite;
11use crate::{DFResult, RemoteSchema, Transform};
12use bigdecimal::ToPrimitive;
13use datafusion::arrow::datatypes::SchemaRef;
14use datafusion::execution::SendableRecordBatchStream;
15use std::fmt::Debug;
16use std::path::PathBuf;
17use std::sync::Arc;
18
19#[async_trait::async_trait]
20pub trait Pool: Debug + Send + Sync {
21 async fn get(&self) -> DFResult<Arc<dyn Connection>>;
22}
23
24#[async_trait::async_trait]
25pub trait Connection: Debug + Send + Sync {
26 async fn infer_schema(
28 &self,
29 sql: &str,
30 transform: Option<Arc<dyn Transform>>,
31 ) -> DFResult<(RemoteSchema, SchemaRef)>;
32
33 async fn query(
34 &self,
35 sql: String,
36 projection: Option<Vec<usize>>,
37 ) -> DFResult<(SendableRecordBatchStream, RemoteSchema)>;
38}
39
40pub async fn connect(options: &ConnectionOptions) -> DFResult<Arc<dyn Pool>> {
41 match options {
42 ConnectionOptions::Postgres(options) => {
43 let pool = connect_postgres(options).await?;
44 Ok(Arc::new(pool))
45 }
46 ConnectionOptions::Mysql(options) => {
47 let pool = connect_mysql(options)?;
48 Ok(Arc::new(pool))
49 }
50 ConnectionOptions::Oracle(options) => {
51 let pool = connect_oracle(options).await?;
52 Ok(Arc::new(pool))
53 }
54 ConnectionOptions::Sqlite(path) => {
55 let pool = connect_sqlite(path).await?;
56 Ok(Arc::new(pool))
57 }
58 }
59}
60
61#[derive(Debug, Clone)]
62pub enum ConnectionOptions {
63 Postgres(PostgresConnectionOptions),
64 Oracle(OracleConnectionOptions),
65 Mysql(MysqlConnectionOptions),
66 Sqlite(PathBuf),
67}
68
69pub(crate) fn projections_contains(projection: Option<&Vec<usize>>, col_idx: usize) -> bool {
70 match projection {
71 Some(p) => p.contains(&col_idx),
72 None => true,
73 }
74}
75
76fn big_decimal_to_i128(decimal: &bigdecimal::BigDecimal, scale: Option<u32>) -> Option<i128> {
77 let scale = scale.unwrap_or_else(|| {
78 decimal
79 .fractional_digit_count()
80 .try_into()
81 .unwrap_or_default()
82 });
83 (decimal * 10i128.pow(scale)).to_i128()
84}