datafusion_remote_table/connection/
mod.rs1#[cfg(feature = "mysql")]
2mod mysql;
3#[cfg(feature = "oracle")]
4mod oracle;
5#[cfg(feature = "postgres")]
6mod postgres;
7#[cfg(feature = "sqlite")]
8mod sqlite;
9
10#[cfg(feature = "mysql")]
11pub use mysql::*;
12#[cfg(feature = "oracle")]
13pub use oracle::*;
14#[cfg(feature = "postgres")]
15pub use postgres::*;
16#[cfg(feature = "sqlite")]
17pub use sqlite::*;
18
19use crate::{DFResult, RemoteSchemaRef};
20use datafusion::arrow::datatypes::SchemaRef;
21use datafusion::execution::SendableRecordBatchStream;
22use std::fmt::Debug;
23#[cfg(feature = "sqlite")]
24use std::path::PathBuf;
25use std::sync::Arc;
26
27#[async_trait::async_trait]
28pub trait Pool: Debug + Send + Sync {
29 async fn get(&self) -> DFResult<Arc<dyn Connection>>;
30}
31
32#[async_trait::async_trait]
33pub trait Connection: Debug + Send + Sync {
34 async fn infer_schema(&self, sql: &str) -> DFResult<(RemoteSchemaRef, SchemaRef)>;
35
36 async fn query(
37 &self,
38 conn_options: &ConnectionOptions,
39 sql: &str,
40 table_schema: SchemaRef,
41 projection: Option<&Vec<usize>>,
42 limit: Option<usize>,
43 ) -> DFResult<SendableRecordBatchStream>;
44}
45
46pub async fn connect(options: &ConnectionOptions) -> DFResult<Arc<dyn Pool>> {
47 match options {
48 #[cfg(feature = "postgres")]
49 ConnectionOptions::Postgres(options) => {
50 let pool = connect_postgres(options).await?;
51 Ok(Arc::new(pool))
52 }
53 #[cfg(feature = "mysql")]
54 ConnectionOptions::Mysql(options) => {
55 let pool = connect_mysql(options)?;
56 Ok(Arc::new(pool))
57 }
58 #[cfg(feature = "oracle")]
59 ConnectionOptions::Oracle(options) => {
60 let pool = connect_oracle(options).await?;
61 Ok(Arc::new(pool))
62 }
63 #[cfg(feature = "sqlite")]
64 ConnectionOptions::Sqlite(path) => {
65 let pool = connect_sqlite(path).await?;
66 Ok(Arc::new(pool))
67 }
68 }
69}
70
71#[derive(Debug, Clone)]
72pub enum ConnectionOptions {
73 #[cfg(feature = "postgres")]
74 Postgres(PostgresConnectionOptions),
75 #[cfg(feature = "oracle")]
76 Oracle(OracleConnectionOptions),
77 #[cfg(feature = "mysql")]
78 Mysql(MysqlConnectionOptions),
79 #[cfg(feature = "sqlite")]
80 Sqlite(PathBuf),
81}
82
83impl ConnectionOptions {
84 pub fn stream_chunk_size(&self) -> usize {
85 match self {
86 #[cfg(feature = "postgres")]
87 ConnectionOptions::Postgres(options) => options.stream_chunk_size,
88 #[cfg(feature = "oracle")]
89 ConnectionOptions::Oracle(options) => options.stream_chunk_size,
90 #[cfg(feature = "mysql")]
91 ConnectionOptions::Mysql(options) => options.stream_chunk_size,
92 #[cfg(feature = "sqlite")]
93 ConnectionOptions::Sqlite(_) => unreachable!(),
94 }
95 }
96}
97
98pub(crate) fn projections_contains(projection: Option<&Vec<usize>>, col_idx: usize) -> bool {
99 match projection {
100 Some(p) => p.contains(&col_idx),
101 None => true,
102 }
103}
104
105#[cfg(any(feature = "mysql", feature = "postgres", feature = "oracle"))]
106fn big_decimal_to_i128(decimal: &bigdecimal::BigDecimal, scale: Option<i32>) -> Option<i128> {
107 use bigdecimal::{FromPrimitive, ToPrimitive};
108 let scale = scale.unwrap_or_else(|| {
109 decimal
110 .fractional_digit_count()
111 .try_into()
112 .unwrap_or_default()
113 });
114 let scale_decimal = bigdecimal::BigDecimal::from_f32(10f32.powi(scale))?;
115 (decimal * scale_decimal).to_i128()
116}