datafusion_remote_table/connection/
mod.rs1mod postgres;
2
3use crate::connection::postgres::connect_postgres;
4use crate::{DFResult, RemoteSchema, Transform};
5use datafusion::arrow::datatypes::SchemaRef;
6use datafusion::execution::SendableRecordBatchStream;
7use std::fmt::Debug;
8use std::path::PathBuf;
9
10#[async_trait::async_trait]
11pub trait Connection: Debug + Send + Sync {
12 async fn infer_schema(
13 &self,
14 sql: &str,
15 transform: Option<&dyn Transform>,
16 ) -> DFResult<(RemoteSchema, SchemaRef)>;
17 async fn query(
18 &self,
19 sql: String,
20 projection: Option<Vec<usize>>,
21 ) -> DFResult<(SendableRecordBatchStream, RemoteSchema)>;
22}
23
24pub async fn connect(args: &ConnectionArgs) -> DFResult<Box<dyn Connection>> {
25 match args {
26 ConnectionArgs::Postgresql {
27 host,
28 port,
29 username,
30 password,
31 database,
32 } => {
33 let conn =
34 connect_postgres(host, *port, username, password, database.as_deref()).await?;
35 Ok(Box::new(conn))
36 }
37 ConnectionArgs::Oracle { .. } => {
38 todo!()
39 }
40 ConnectionArgs::Mysql { .. } => {
41 todo!()
42 }
43 ConnectionArgs::Sqlite(_) => {
44 todo!()
45 }
46 }
47}
48
49#[derive(Debug, Clone)]
50pub enum ConnectionArgs {
51 Postgresql {
52 host: String,
53 port: u16,
54 username: String,
55 password: String,
56 database: Option<String>,
57 },
58 Oracle {
59 host: String,
60 port: u16,
61 username: String,
62 password: String,
63 database: Option<String>,
64 },
65 Mysql {
66 host: String,
67 port: u16,
68 username: String,
69 password: String,
70 database: Option<String>,
71 },
72 Sqlite(PathBuf),
73}
74
75pub(crate) fn projections_contains(projection: Option<&Vec<usize>>, col_idx: usize) -> bool {
76 match projection {
77 Some(p) => p.contains(&col_idx),
78 None => true,
79 }
80}