datafusion_remote_table/connection/
mod.rs

1mod mysql;
2mod oracle;
3mod postgres;
4mod sqlite;
5
6pub use mysql::*;
7pub use oracle::*;
8pub use postgres::*;
9
10use crate::{DFResult, RemoteSchema, Transform};
11use datafusion::arrow::datatypes::SchemaRef;
12use datafusion::execution::SendableRecordBatchStream;
13use std::fmt::Debug;
14use std::path::PathBuf;
15
16#[async_trait::async_trait]
17pub trait Connection: Debug + Send + Sync {
18    async fn infer_schema(
19        &self,
20        sql: &str,
21        transform: Option<&dyn Transform>,
22    ) -> DFResult<(RemoteSchema, SchemaRef)>;
23
24    async fn query(
25        &self,
26        sql: String,
27        projection: Option<Vec<usize>>,
28    ) -> DFResult<(SendableRecordBatchStream, RemoteSchema)>;
29}
30
31pub async fn connect(options: &ConnectionOptions) -> DFResult<Box<dyn Connection>> {
32    match options {
33        ConnectionOptions::Postgres(options) => {
34            let conn = connect_postgres(options).await?;
35            Ok(Box::new(conn))
36        }
37        ConnectionOptions::Oracle(_options) => {
38            todo!()
39        }
40        ConnectionOptions::Mysql(_options) => {
41            todo!()
42        }
43        ConnectionOptions::Sqlite(_) => {
44            todo!()
45        }
46    }
47}
48
49#[derive(Debug, Clone)]
50pub enum ConnectionOptions {
51    Postgres(PostgresConnectionOptions),
52    Oracle(OracleConnectionOptions),
53    Mysql(MysqlConnectionOptions),
54    Sqlite(PathBuf),
55}
56
57pub(crate) fn projections_contains(projection: Option<&Vec<usize>>, col_idx: usize) -> bool {
58    match projection {
59        Some(p) => p.contains(&col_idx),
60        None => true,
61    }
62}