datafusion_remote_table/connection/
mod.rs1mod mysql;
2mod oracle;
3mod postgres;
4mod sqlite;
5
6pub use mysql::*;
7pub use oracle::*;
8pub use postgres::*;
9
10use crate::{DFResult, RemoteSchema, Transform};
11use datafusion::arrow::datatypes::SchemaRef;
12use datafusion::execution::SendableRecordBatchStream;
13use std::fmt::Debug;
14use std::path::PathBuf;
15use std::sync::Arc;
16
17#[async_trait::async_trait]
18pub trait Pool: Debug + Send + Sync {
19 async fn get(&self) -> DFResult<Arc<dyn Connection>>;
20}
21
22#[async_trait::async_trait]
23pub trait Connection: Debug + Send + Sync {
24 async fn infer_schema(
25 &self,
26 sql: &str,
27 transform: Option<&dyn Transform>,
28 ) -> DFResult<(RemoteSchema, SchemaRef)>;
29
30 async fn query(
31 &self,
32 sql: String,
33 projection: Option<Vec<usize>>,
34 ) -> DFResult<(SendableRecordBatchStream, RemoteSchema)>;
35}
36
37pub async fn connect(options: &ConnectionOptions) -> DFResult<Arc<dyn Pool>> {
38 match options {
39 ConnectionOptions::Postgres(options) => {
40 let pool = connect_postgres(options).await?;
41 Ok(Arc::new(pool))
42 }
43 ConnectionOptions::Oracle(_options) => {
44 todo!()
45 }
46 ConnectionOptions::Mysql(_options) => {
47 todo!()
48 }
49 ConnectionOptions::Sqlite(_) => {
50 todo!()
51 }
52 }
53}
54
55#[derive(Debug, Clone)]
56pub enum ConnectionOptions {
57 Postgres(PostgresConnectionOptions),
58 Oracle(OracleConnectionOptions),
59 Mysql(MysqlConnectionOptions),
60 Sqlite(PathBuf),
61}
62
63pub(crate) fn projections_contains(projection: Option<&Vec<usize>>, col_idx: usize) -> bool {
64 match projection {
65 Some(p) => p.contains(&col_idx),
66 None => true,
67 }
68}