sqlx_exasol/options/
mod.rs1mod builder;
2mod error;
3mod protocol_version;
4mod ssl_mode;
5
6use std::{borrow::Cow, net::SocketAddr, num::NonZeroUsize, path::PathBuf, str::FromStr};
7
8pub use builder::ExaConnectOptionsBuilder;
9use error::ExaConfigError;
10pub use protocol_version::ProtocolVersion;
11use sqlx_core::{
12 connection::{ConnectOptions, LogSettings},
13 net::tls::CertificateInput,
14};
15pub use ssl_mode::ExaSslMode;
16use tracing::log;
17use url::Url;
18
19use crate::{
20 connection::{
21 websocket::request::{ExaLoginRequest, LoginRef},
22 ExaConnection,
23 },
24 responses::ExaRwAttributes,
25 SqlxError, SqlxResult,
26};
27
28const URL_SCHEME: &str = "exa";
29
30const DEFAULT_FETCH_SIZE: usize = 5 * 1024 * 1024;
31const DEFAULT_PORT: u16 = 8563;
32const DEFAULT_CACHE_CAPACITY: NonZeroUsize = match NonZeroUsize::new(100) {
33 Some(v) => v,
34 None => unreachable!(),
35};
36
37const PARAM_ACCESS_TOKEN: &str = "access-token";
38const PARAM_REFRESH_TOKEN: &str = "refresh-token";
39const PARAM_PROTOCOL_VERSION: &str = "protocol-version";
40const PARAM_SSL_MODE: &str = "ssl-mode";
41const PARAM_SSL_CA: &str = "ssl-ca";
42const PARAM_SSL_CERT: &str = "ssl-cert";
43const PARAM_SSL_KEY: &str = "ssl-key";
44const PARAM_CACHE_CAP: &str = "statement-cache-capacity";
45const PARAM_FETCH_SIZE: &str = "fetch-size";
46const PARAM_QUERY_TIMEOUT: &str = "query-timeout";
47const PARAM_COMPRESSION: &str = "compression";
48const PARAM_FEEDBACK_INTERVAL: &str = "feedback-interval";
49
50#[derive(Debug, Clone)]
55pub struct ExaConnectOptions {
56 pub(crate) hosts_details: Vec<(String, Vec<SocketAddr>)>,
57 pub(crate) port: u16,
58 pub(crate) ssl_mode: ExaSslMode,
59 pub(crate) ssl_ca: Option<CertificateInput>,
60 pub(crate) ssl_client_cert: Option<CertificateInput>,
61 pub(crate) ssl_client_key: Option<CertificateInput>,
62 pub(crate) statement_cache_capacity: NonZeroUsize,
63 pub(crate) schema: Option<String>,
64 pub(crate) compression: bool,
65 login: Login,
66 protocol_version: ProtocolVersion,
67 fetch_size: usize,
68 query_timeout: u64,
69 feedback_interval: u8,
70 log_settings: LogSettings,
71}
72
73impl ExaConnectOptions {
74 #[must_use]
75 pub fn builder() -> ExaConnectOptionsBuilder {
76 ExaConnectOptionsBuilder::default()
77 }
78}
79
80impl FromStr for ExaConnectOptions {
81 type Err = SqlxError;
82
83 fn from_str(s: &str) -> Result<Self, Self::Err> {
84 let url = Url::parse(s)
85 .map_err(From::from)
86 .map_err(SqlxError::Configuration)?;
87 Self::from_url(&url)
88 }
89}
90
91impl ConnectOptions for ExaConnectOptions {
92 type Connection = ExaConnection;
93
94 fn from_url(url: &Url) -> SqlxResult<Self> {
95 let scheme = url.scheme();
96
97 if URL_SCHEME != scheme {
98 return Err(ExaConfigError::InvalidUrlScheme(scheme.to_owned()).into());
99 }
100
101 let mut builder = Self::builder();
102
103 if let Some(host) = url.host_str() {
104 builder = builder.host(host.to_owned());
105 }
106
107 let username = url.username();
108 if !username.is_empty() {
109 builder = builder.username(username.to_owned());
110 }
111
112 if let Some(password) = url.password() {
113 builder = builder.password(password.to_owned());
114 }
115
116 if let Some(port) = url.port() {
117 builder = builder.port(port);
118 }
119
120 let opt_schema = url.path_segments().into_iter().flatten().next();
121
122 if let Some(schema) = opt_schema {
123 builder = builder.schema(schema.to_owned());
124 }
125
126 for (name, value) in url.query_pairs() {
127 match name.as_ref() {
128 PARAM_ACCESS_TOKEN => builder = builder.access_token(value.to_string()),
129
130 PARAM_REFRESH_TOKEN => builder = builder.refresh_token(value.to_string()),
131
132 PARAM_PROTOCOL_VERSION => {
133 let protocol_version = value.parse::<ProtocolVersion>()?;
134 builder = builder.protocol_version(protocol_version);
135 }
136
137 PARAM_SSL_MODE => {
138 let ssl_mode = value.parse::<ExaSslMode>()?;
139 builder = builder.ssl_mode(ssl_mode);
140 }
141
142 PARAM_SSL_CA => {
143 let ssl_ca = CertificateInput::File(PathBuf::from(value.to_string()));
144 builder = builder.ssl_ca(ssl_ca);
145 }
146
147 PARAM_SSL_CERT => {
148 let ssl_cert = CertificateInput::File(PathBuf::from(value.to_string()));
149 builder = builder.ssl_client_cert(ssl_cert);
150 }
151
152 PARAM_SSL_KEY => {
153 let ssl_key = CertificateInput::File(PathBuf::from(value.to_string()));
154 builder = builder.ssl_client_key(ssl_key);
155 }
156
157 PARAM_CACHE_CAP => {
158 let capacity = value
159 .parse::<NonZeroUsize>()
160 .map_err(|_| ExaConfigError::InvalidParameter(PARAM_CACHE_CAP))?;
161 builder = builder.statement_cache_capacity(capacity);
162 }
163
164 PARAM_FETCH_SIZE => {
165 let fetch_size = value
166 .parse::<usize>()
167 .map_err(|_| ExaConfigError::InvalidParameter(PARAM_FETCH_SIZE))?;
168 builder = builder.fetch_size(fetch_size);
169 }
170
171 PARAM_QUERY_TIMEOUT => {
172 let query_timeout = value
173 .parse::<u64>()
174 .map_err(|_| ExaConfigError::InvalidParameter(PARAM_QUERY_TIMEOUT))?;
175 builder = builder.query_timeout(query_timeout);
176 }
177
178 PARAM_COMPRESSION => {
179 let compression = value
180 .parse::<bool>()
181 .map_err(|_| ExaConfigError::InvalidParameter(PARAM_COMPRESSION))?;
182 builder = builder.compression(compression);
183 }
184
185 PARAM_FEEDBACK_INTERVAL => {
186 let feedback_interval = value
187 .parse::<u8>()
188 .map_err(|_| ExaConfigError::InvalidParameter(PARAM_FEEDBACK_INTERVAL))?;
189 builder = builder.feedback_interval(feedback_interval);
190 }
191
192 _ => {
193 return Err(SqlxError::Protocol(format!(
194 "Unknown connection string parameter: {value}"
195 )))
196 }
197 };
198 }
199
200 builder.build()
201 }
202
203 fn connect(&self) -> futures_util::future::BoxFuture<'_, SqlxResult<Self::Connection>>
204 where
205 Self::Connection: Sized,
206 {
207 Box::pin(ExaConnection::establish(self))
208 }
209
210 fn log_statements(mut self, level: log::LevelFilter) -> Self {
211 self.log_settings.log_statements(level);
212 self
213 }
214
215 fn log_slow_statements(
216 mut self,
217 level: log::LevelFilter,
218 duration: std::time::Duration,
219 ) -> Self {
220 self.log_settings.log_slow_statements(level, duration);
221 self
222 }
223}
224
225impl<'a> From<&'a ExaConnectOptions> for ExaLoginRequest<'a> {
226 fn from(value: &'a ExaConnectOptions) -> Self {
227 let crate_version = option_env!("CARGO_PKG_VERSION").unwrap_or("UNKNOWN");
228
229 let attributes = ExaRwAttributes::new(
230 value.schema.as_deref().map(Cow::Borrowed),
231 value.feedback_interval.into(),
232 value.query_timeout,
233 );
234
235 Self {
236 protocol_version: value.protocol_version,
237 fetch_size: value.fetch_size,
238 statement_cache_capacity: value.statement_cache_capacity,
239 login: (&value.login).into(),
240 use_compression: value.compression,
241 client_name: "sqlx-exasol",
242 client_version: crate_version,
243 client_os: std::env::consts::OS,
244 client_runtime: "RUST",
245 attributes,
246 }
247 }
248}
249
250#[derive(Clone, Debug)]
253pub enum Login {
254 Credentials { username: String, password: String },
255 AccessToken { access_token: String },
256 RefreshToken { refresh_token: String },
257}
258
259impl<'a> From<&'a Login> for LoginRef<'a> {
260 fn from(value: &'a Login) -> Self {
261 match value {
262 Login::Credentials { username, password } => LoginRef::Credentials {
263 username,
264 password: Cow::Borrowed(password),
265 },
266 Login::AccessToken { access_token } => LoginRef::AccessToken { access_token },
267 Login::RefreshToken { refresh_token } => LoginRef::RefreshToken { refresh_token },
268 }
269 }
270}
271
272#[derive(Debug, Clone, Copy)]
274#[allow(clippy::struct_field_names)]
275pub struct ExaTlsOptionsRef<'a> {
276 pub ssl_mode: ExaSslMode,
277 pub ssl_ca: Option<&'a CertificateInput>,
278 pub ssl_client_cert: Option<&'a CertificateInput>,
279 pub ssl_client_key: Option<&'a CertificateInput>,
280}
281
282impl<'a> From<&'a ExaConnectOptions> for ExaTlsOptionsRef<'a> {
283 fn from(value: &'a ExaConnectOptions) -> Self {
284 ExaTlsOptionsRef {
285 ssl_mode: value.ssl_mode,
286 ssl_ca: value.ssl_ca.as_ref(),
287 ssl_client_cert: value.ssl_client_cert.as_ref(),
288 ssl_client_key: value.ssl_client_key.as_ref(),
289 }
290 }
291}