datafusion_remote_table/connection/
mod.rs

1mod mysql;
2mod oracle;
3mod postgres;
4mod sqlite;
5
6pub use mysql::*;
7pub use oracle::*;
8pub use postgres::*;
9
10use crate::connection::sqlite::connect_sqlite;
11use crate::{DFResult, RemoteSchema, Transform};
12use datafusion::arrow::datatypes::SchemaRef;
13use datafusion::execution::SendableRecordBatchStream;
14use std::fmt::Debug;
15use std::path::PathBuf;
16use std::sync::Arc;
17
18#[async_trait::async_trait]
19pub trait Pool: Debug + Send + Sync {
20    async fn get(&self) -> DFResult<Arc<dyn Connection>>;
21}
22
23#[async_trait::async_trait]
24pub trait Connection: Debug + Send + Sync {
25    async fn infer_schema(
26        &self,
27        sql: &str,
28        transform: Option<Arc<dyn Transform>>,
29    ) -> DFResult<(RemoteSchema, SchemaRef)>;
30
31    async fn query(
32        &self,
33        sql: String,
34        projection: Option<Vec<usize>>,
35    ) -> DFResult<(SendableRecordBatchStream, RemoteSchema)>;
36}
37
38pub async fn connect(options: &ConnectionOptions) -> DFResult<Arc<dyn Pool>> {
39    match options {
40        ConnectionOptions::Postgres(options) => {
41            let pool = connect_postgres(options).await?;
42            Ok(Arc::new(pool))
43        }
44        ConnectionOptions::Mysql(options) => {
45            let pool = connect_mysql(options)?;
46            Ok(Arc::new(pool))
47        }
48        ConnectionOptions::Oracle(options) => {
49            let pool = connect_oracle(options).await?;
50            Ok(Arc::new(pool))
51        }
52        ConnectionOptions::Sqlite(path) => {
53            let pool = connect_sqlite(path).await?;
54            Ok(Arc::new(pool))
55        }
56    }
57}
58
59#[derive(Debug, Clone)]
60pub enum ConnectionOptions {
61    Postgres(PostgresConnectionOptions),
62    Oracle(OracleConnectionOptions),
63    Mysql(MysqlConnectionOptions),
64    Sqlite(PathBuf),
65}
66
67pub(crate) fn projections_contains(projection: Option<&Vec<usize>>, col_idx: usize) -> bool {
68    match projection {
69        Some(p) => p.contains(&col_idx),
70        None => true,
71    }
72}