datafusion_remote_table/connection/
mod.rs1mod mysql;
2mod oracle;
3mod postgres;
4mod sqlite;
5
6pub use mysql::*;
7pub use oracle::*;
8pub use postgres::*;
9
10use crate::{DFResult, RemoteSchema, Transform};
11use datafusion::arrow::datatypes::SchemaRef;
12use datafusion::execution::SendableRecordBatchStream;
13use std::fmt::Debug;
14use std::path::PathBuf;
15
16#[async_trait::async_trait]
17pub trait Connection: Debug + Send + Sync {
18 async fn infer_schema(
19 &self,
20 sql: &str,
21 transform: Option<&dyn Transform>,
22 ) -> DFResult<(RemoteSchema, SchemaRef)>;
23
24 async fn query(
25 &self,
26 sql: String,
27 projection: Option<Vec<usize>>,
28 ) -> DFResult<(SendableRecordBatchStream, RemoteSchema)>;
29}
30
31pub async fn connect(options: &ConnectionOptions) -> DFResult<Box<dyn Connection>> {
32 match options {
33 ConnectionOptions::Postgres(options) => {
34 let conn = connect_postgres(options).await?;
35 Ok(Box::new(conn))
36 }
37 ConnectionOptions::Oracle(_options) => {
38 todo!()
39 }
40 ConnectionOptions::Mysql(_options) => {
41 todo!()
42 }
43 ConnectionOptions::Sqlite(_) => {
44 todo!()
45 }
46 }
47}
48
49#[derive(Debug, Clone)]
50pub enum ConnectionOptions {
51 Postgres(PostgresConnectionOptions),
52 Oracle(OracleConnectionOptions),
53 Mysql(MysqlConnectionOptions),
54 Sqlite(PathBuf),
55}
56
57pub(crate) fn projections_contains(projection: Option<&Vec<usize>>, col_idx: usize) -> bool {
58 match projection {
59 Some(p) => p.contains(&col_idx),
60 None => true,
61 }
62}