datafusion_remote_table/connection/
mod.rs

1mod mysql;
2mod oracle;
3mod postgres;
4mod sqlite;
5
6pub use mysql::*;
7pub use oracle::*;
8pub use postgres::*;
9
10use crate::{DFResult, RemoteSchema, Transform};
11use datafusion::arrow::datatypes::SchemaRef;
12use datafusion::execution::SendableRecordBatchStream;
13use std::fmt::Debug;
14use std::path::PathBuf;
15use std::sync::Arc;
16
17#[async_trait::async_trait]
18pub trait Pool: Debug + Send + Sync {
19    async fn get(&self) -> DFResult<Arc<dyn Connection>>;
20}
21
22#[async_trait::async_trait]
23pub trait Connection: Debug + Send + Sync {
24    async fn infer_schema(
25        &self,
26        sql: &str,
27        transform: Option<&dyn Transform>,
28    ) -> DFResult<(RemoteSchema, SchemaRef)>;
29
30    async fn query(
31        &self,
32        sql: String,
33        projection: Option<Vec<usize>>,
34    ) -> DFResult<(SendableRecordBatchStream, RemoteSchema)>;
35}
36
37pub async fn connect(options: &ConnectionOptions) -> DFResult<Arc<dyn Pool>> {
38    match options {
39        ConnectionOptions::Postgres(options) => {
40            let pool = connect_postgres(options).await?;
41            Ok(Arc::new(pool))
42        }
43        ConnectionOptions::Oracle(_options) => {
44            todo!()
45        }
46        ConnectionOptions::Mysql(_options) => {
47            todo!()
48        }
49        ConnectionOptions::Sqlite(_) => {
50            todo!()
51        }
52    }
53}
54
55#[derive(Debug, Clone)]
56pub enum ConnectionOptions {
57    Postgres(PostgresConnectionOptions),
58    Oracle(OracleConnectionOptions),
59    Mysql(MysqlConnectionOptions),
60    Sqlite(PathBuf),
61}
62
63pub(crate) fn projections_contains(projection: Option<&Vec<usize>>, col_idx: usize) -> bool {
64    match projection {
65        Some(p) => p.contains(&col_idx),
66        None => true,
67    }
68}