datafusion_remote_table/connection/
mod.rs

1mod postgres;
2
3use crate::connection::postgres::connect_postgres;
4use crate::{DFResult, RemoteSchema, Transform};
5use datafusion::arrow::datatypes::SchemaRef;
6use datafusion::execution::SendableRecordBatchStream;
7use std::fmt::Debug;
8use std::path::PathBuf;
9
10#[async_trait::async_trait]
11pub trait Connection: Debug + Send + Sync {
12    async fn infer_schema(
13        &self,
14        sql: &str,
15        transform: Option<&dyn Transform>,
16    ) -> DFResult<(RemoteSchema, SchemaRef)>;
17    async fn query(
18        &self,
19        sql: String,
20        projection: Option<Vec<usize>>,
21    ) -> DFResult<(SendableRecordBatchStream, RemoteSchema)>;
22}
23
24pub async fn connect(args: &ConnectionArgs) -> DFResult<Box<dyn Connection>> {
25    match args {
26        ConnectionArgs::Postgresql {
27            host,
28            port,
29            username,
30            password,
31            database,
32        } => {
33            let conn =
34                connect_postgres(host, *port, username, password, database.as_deref()).await?;
35            Ok(Box::new(conn))
36        }
37        ConnectionArgs::Oracle { .. } => {
38            todo!()
39        }
40        ConnectionArgs::Mysql { .. } => {
41            todo!()
42        }
43        ConnectionArgs::Sqlite(_) => {
44            todo!()
45        }
46    }
47}
48
49#[derive(Debug, Clone)]
50pub enum ConnectionArgs {
51    Postgresql {
52        host: String,
53        port: u16,
54        username: String,
55        password: String,
56        database: Option<String>,
57    },
58    Oracle {
59        host: String,
60        port: u16,
61        username: String,
62        password: String,
63        database: Option<String>,
64    },
65    Mysql {
66        host: String,
67        port: u16,
68        username: String,
69        password: String,
70        database: Option<String>,
71    },
72    Sqlite(PathBuf),
73}
74
75pub(crate) fn projections_contains(projection: Option<&Vec<usize>>, col_idx: usize) -> bool {
76    match projection {
77        Some(p) => p.contains(&col_idx),
78        None => true,
79    }
80}