datafusion_remote_table/connection/
mod.rs1mod mysql;
2mod oracle;
3mod postgres;
4mod sqlite;
5
6pub use mysql::*;
7pub use oracle::*;
8pub use postgres::*;
9
10use crate::connection::sqlite::connect_sqlite;
11use crate::{DFResult, RemoteSchemaRef, Transform};
12use bigdecimal::ToPrimitive;
13use datafusion::arrow::datatypes::SchemaRef;
14use datafusion::execution::SendableRecordBatchStream;
15use std::fmt::Debug;
16use std::path::PathBuf;
17use std::sync::Arc;
18
19#[async_trait::async_trait]
20pub trait Pool: Debug + Send + Sync {
21 async fn get(&self) -> DFResult<Arc<dyn Connection>>;
22}
23
24#[async_trait::async_trait]
25pub trait Connection: Debug + Send + Sync {
26 async fn infer_schema(
28 &self,
29 sql: &str,
30 transform: Option<Arc<dyn Transform>>,
31 ) -> DFResult<(RemoteSchemaRef, SchemaRef)>;
32
33 async fn query(
34 &self,
35 conn_options: &ConnectionOptions,
36 sql: &str,
37 table_schema: SchemaRef,
38 projection: Option<&Vec<usize>>,
39 ) -> DFResult<SendableRecordBatchStream>;
40}
41
42pub async fn connect(options: &ConnectionOptions) -> DFResult<Arc<dyn Pool>> {
43 match options {
44 ConnectionOptions::Postgres(options) => {
45 let pool = connect_postgres(options).await?;
46 Ok(Arc::new(pool))
47 }
48 ConnectionOptions::Mysql(options) => {
49 let pool = connect_mysql(options)?;
50 Ok(Arc::new(pool))
51 }
52 ConnectionOptions::Oracle(options) => {
53 let pool = connect_oracle(options).await?;
54 Ok(Arc::new(pool))
55 }
56 ConnectionOptions::Sqlite(path) => {
57 let pool = connect_sqlite(path).await?;
58 Ok(Arc::new(pool))
59 }
60 }
61}
62
63#[derive(Debug, Clone)]
64pub enum ConnectionOptions {
65 Postgres(PostgresConnectionOptions),
66 Oracle(OracleConnectionOptions),
67 Mysql(MysqlConnectionOptions),
68 Sqlite(PathBuf),
69}
70
71impl ConnectionOptions {
72 pub fn chunk_size(&self) -> Option<usize> {
73 match self {
74 ConnectionOptions::Postgres(options) => options.chunk_size,
75 ConnectionOptions::Oracle(options) => options.chunk_size,
76 ConnectionOptions::Mysql(options) => options.chunk_size,
77 ConnectionOptions::Sqlite(_) => None,
78 }
79 }
80}
81
82pub(crate) fn projections_contains(projection: Option<&Vec<usize>>, col_idx: usize) -> bool {
83 match projection {
84 Some(p) => p.contains(&col_idx),
85 None => true,
86 }
87}
88
89fn big_decimal_to_i128(decimal: &bigdecimal::BigDecimal, scale: Option<u32>) -> Option<i128> {
90 let scale = scale.unwrap_or_else(|| {
91 decimal
92 .fractional_digit_count()
93 .try_into()
94 .unwrap_or_default()
95 });
96 (decimal * 10i128.pow(scale)).to_i128()
97}