datafusion_remote_table/connection/
options.rs

1use crate::RemoteDbType;
2use derive_getters::Getters;
3use derive_with::With;
4use std::{path::PathBuf, time::Duration};
5
6#[derive(Debug, Clone)]
7pub enum ConnectionOptions {
8    Postgres(PostgresConnectionOptions),
9    Oracle(OracleConnectionOptions),
10    Mysql(MysqlConnectionOptions),
11    Sqlite(SqliteConnectionOptions),
12    Dm(DmConnectionOptions),
13}
14
15impl ConnectionOptions {
16    pub(crate) fn stream_chunk_size(&self) -> usize {
17        match self {
18            ConnectionOptions::Postgres(options) => options.stream_chunk_size,
19            ConnectionOptions::Oracle(options) => options.stream_chunk_size,
20            ConnectionOptions::Mysql(options) => options.stream_chunk_size,
21            ConnectionOptions::Sqlite(options) => options.stream_chunk_size,
22            ConnectionOptions::Dm(options) => options.stream_chunk_size,
23        }
24    }
25
26    pub(crate) fn db_type(&self) -> RemoteDbType {
27        match self {
28            ConnectionOptions::Postgres(_) => RemoteDbType::Postgres,
29            ConnectionOptions::Oracle(_) => RemoteDbType::Oracle,
30            ConnectionOptions::Mysql(_) => RemoteDbType::Mysql,
31            ConnectionOptions::Sqlite(_) => RemoteDbType::Sqlite,
32            ConnectionOptions::Dm(_) => RemoteDbType::Dm,
33        }
34    }
35
36    pub fn with_pool_max_size(self, pool_max_size: usize) -> Self {
37        match self {
38            ConnectionOptions::Postgres(options) => {
39                ConnectionOptions::Postgres(options.with_pool_max_size(pool_max_size))
40            }
41            ConnectionOptions::Oracle(options) => {
42                ConnectionOptions::Oracle(options.with_pool_max_size(pool_max_size))
43            }
44            ConnectionOptions::Mysql(options) => {
45                ConnectionOptions::Mysql(options.with_pool_max_size(pool_max_size))
46            }
47            ConnectionOptions::Sqlite(options) => ConnectionOptions::Sqlite(options),
48            ConnectionOptions::Dm(options) => ConnectionOptions::Dm(options),
49        }
50    }
51}
52
53#[derive(Debug, Clone, With, Getters)]
54pub struct PostgresConnectionOptions {
55    pub(crate) host: String,
56    pub(crate) port: u16,
57    pub(crate) username: String,
58    pub(crate) password: String,
59    pub(crate) database: Option<String>,
60    pub(crate) pool_max_size: usize,
61    pub(crate) pool_min_idle: usize,
62    pub(crate) pool_idle_timeout: Duration,
63    pub(crate) pool_ttl_check_interval: Duration,
64    pub(crate) stream_chunk_size: usize,
65    pub(crate) default_numeric_scale: i8,
66}
67
68impl PostgresConnectionOptions {
69    pub fn new(
70        host: impl Into<String>,
71        port: u16,
72        username: impl Into<String>,
73        password: impl Into<String>,
74    ) -> Self {
75        Self {
76            host: host.into(),
77            port,
78            username: username.into(),
79            password: password.into(),
80            database: None,
81            pool_max_size: 10,
82            pool_min_idle: 0,
83            pool_idle_timeout: Duration::from_secs(10 * 60),
84            pool_ttl_check_interval: Duration::from_secs(30),
85            stream_chunk_size: 2048,
86            default_numeric_scale: 10,
87        }
88    }
89}
90
91impl From<PostgresConnectionOptions> for ConnectionOptions {
92    fn from(options: PostgresConnectionOptions) -> Self {
93        ConnectionOptions::Postgres(options)
94    }
95}
96
97#[derive(Debug, Clone, With, Getters)]
98pub struct MysqlConnectionOptions {
99    pub(crate) host: String,
100    pub(crate) port: u16,
101    pub(crate) username: String,
102    pub(crate) password: String,
103    pub(crate) database: Option<String>,
104    pub(crate) pool_max_size: usize,
105    pub(crate) pool_min_idle: usize,
106    pub(crate) pool_idle_timeout: Duration,
107    pub(crate) pool_ttl_check_interval: Duration,
108    pub(crate) stream_chunk_size: usize,
109}
110
111impl MysqlConnectionOptions {
112    pub fn new(
113        host: impl Into<String>,
114        port: u16,
115        username: impl Into<String>,
116        password: impl Into<String>,
117    ) -> Self {
118        Self {
119            host: host.into(),
120            port,
121            username: username.into(),
122            password: password.into(),
123            database: None,
124            pool_max_size: 10,
125            pool_min_idle: 0,
126            pool_idle_timeout: Duration::from_secs(10 * 60),
127            pool_ttl_check_interval: Duration::from_secs(30),
128            stream_chunk_size: 2048,
129        }
130    }
131}
132
133impl From<MysqlConnectionOptions> for ConnectionOptions {
134    fn from(options: MysqlConnectionOptions) -> Self {
135        ConnectionOptions::Mysql(options)
136    }
137}
138
139#[derive(Debug, Clone, With, Getters)]
140pub struct OracleConnectionOptions {
141    pub(crate) host: String,
142    pub(crate) port: u16,
143    pub(crate) username: String,
144    pub(crate) password: String,
145    pub(crate) service_name: String,
146    pub(crate) pool_max_size: usize,
147    pub(crate) pool_min_idle: usize,
148    pub(crate) pool_idle_timeout: Duration,
149    pub(crate) pool_ttl_check_interval: Duration,
150    pub(crate) stream_chunk_size: usize,
151}
152
153impl OracleConnectionOptions {
154    pub fn new(
155        host: impl Into<String>,
156        port: u16,
157        username: impl Into<String>,
158        password: impl Into<String>,
159        service_name: impl Into<String>,
160    ) -> Self {
161        Self {
162            host: host.into(),
163            port,
164            username: username.into(),
165            password: password.into(),
166            service_name: service_name.into(),
167            pool_max_size: 10,
168            pool_min_idle: 0,
169            pool_idle_timeout: Duration::from_secs(10 * 60),
170            pool_ttl_check_interval: Duration::from_secs(30),
171            stream_chunk_size: 2048,
172        }
173    }
174}
175
176impl From<OracleConnectionOptions> for ConnectionOptions {
177    fn from(options: OracleConnectionOptions) -> Self {
178        ConnectionOptions::Oracle(options)
179    }
180}
181
182#[derive(Debug, Clone, With, Getters)]
183pub struct SqliteConnectionOptions {
184    pub path: PathBuf,
185    pub stream_chunk_size: usize,
186}
187
188impl SqliteConnectionOptions {
189    pub fn new(path: PathBuf) -> Self {
190        Self {
191            path,
192            stream_chunk_size: 2048,
193        }
194    }
195}
196
197impl From<SqliteConnectionOptions> for ConnectionOptions {
198    fn from(options: SqliteConnectionOptions) -> Self {
199        ConnectionOptions::Sqlite(options)
200    }
201}
202
203#[derive(Debug, Clone, With, Getters)]
204pub struct DmConnectionOptions {
205    pub(crate) host: String,
206    pub(crate) port: u16,
207    pub(crate) username: String,
208    pub(crate) password: String,
209    pub(crate) schema: Option<String>,
210    pub(crate) stream_chunk_size: usize,
211    pub(crate) driver: String,
212}
213
214impl DmConnectionOptions {
215    pub fn new(
216        host: impl Into<String>,
217        port: u16,
218        username: impl Into<String>,
219        password: impl Into<String>,
220    ) -> Self {
221        Self {
222            host: host.into(),
223            port,
224            username: username.into(),
225            password: password.into(),
226            schema: None,
227            stream_chunk_size: 1024,
228            driver: "DM8 ODBC DRIVER".to_string(),
229        }
230    }
231}
232
233impl From<DmConnectionOptions> for ConnectionOptions {
234    fn from(options: DmConnectionOptions) -> Self {
235        ConnectionOptions::Dm(options)
236    }
237}