datafusion_remote_table/connection/
mod.rs1mod mysql;
2mod oracle;
3mod postgres;
4mod sqlite;
5
6pub use mysql::*;
7pub use oracle::*;
8pub use postgres::*;
9
10use crate::connection::sqlite::connect_sqlite;
11use crate::{DFResult, RemoteSchema, Transform};
12use datafusion::arrow::datatypes::SchemaRef;
13use datafusion::execution::SendableRecordBatchStream;
14use std::fmt::Debug;
15use std::path::PathBuf;
16use std::sync::Arc;
17
18#[async_trait::async_trait]
19pub trait Pool: Debug + Send + Sync {
20 async fn get(&self) -> DFResult<Arc<dyn Connection>>;
21}
22
23#[async_trait::async_trait]
24pub trait Connection: Debug + Send + Sync {
25 async fn infer_schema(
26 &self,
27 sql: &str,
28 transform: Option<Arc<dyn Transform>>,
29 ) -> DFResult<(RemoteSchema, SchemaRef)>;
30
31 async fn query(
32 &self,
33 sql: String,
34 projection: Option<Vec<usize>>,
35 ) -> DFResult<(SendableRecordBatchStream, RemoteSchema)>;
36}
37
38pub async fn connect(options: &ConnectionOptions) -> DFResult<Arc<dyn Pool>> {
39 match options {
40 ConnectionOptions::Postgres(options) => {
41 let pool = connect_postgres(options).await?;
42 Ok(Arc::new(pool))
43 }
44 ConnectionOptions::Mysql(options) => {
45 let pool = connect_mysql(options)?;
46 Ok(Arc::new(pool))
47 }
48 ConnectionOptions::Oracle(options) => {
49 let pool = connect_oracle(options).await?;
50 Ok(Arc::new(pool))
51 }
52 ConnectionOptions::Sqlite(path) => {
53 let pool = connect_sqlite(path).await?;
54 Ok(Arc::new(pool))
55 }
56 }
57}
58
59#[derive(Debug, Clone)]
60pub enum ConnectionOptions {
61 Postgres(PostgresConnectionOptions),
62 Oracle(OracleConnectionOptions),
63 Mysql(MysqlConnectionOptions),
64 Sqlite(PathBuf),
65}
66
67pub(crate) fn projections_contains(projection: Option<&Vec<usize>>, col_idx: usize) -> bool {
68 match projection {
69 Some(p) => p.contains(&col_idx),
70 None => true,
71 }
72}