1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::str::FromStr;
4use std::time::Duration;
5
6use clickhouse_arrow::prelude::Secret;
7use clickhouse_arrow::{
8 ArrowConnectionPoolBuilder, ArrowOptions, CompressionMethod, CreateOptions, Destination,
9 Settings,
10};
11use datafusion::common::{exec_err, plan_err};
12use datafusion::error::{DataFusionError, Result};
13use datafusion::logical_expr::Expr;
14use datafusion::prelude::lit;
15use datafusion::sql::sqlparser::ast;
16use datafusion::sql::unparser::Unparser;
17use datafusion::sql::unparser::dialect::Dialect;
18
19use crate::default_arrow_options;
20use crate::dialect::ClickHouseDialect;
21
22pub(crate) const ENDPOINT_PARAM: &str = "endpoint";
24pub(crate) const USERNAME_PARAM: &str = "username";
25pub(crate) const PASSWORD_PARAM: &str = "password";
26pub(crate) const DEFAULT_DATABASE_PARAM: &str = "default_database";
27pub(crate) const COMPRESSION_PARAM: &str = "compression";
28pub(crate) const DOMAIN_PARAM: &str = "domain";
29pub(crate) const CAFILE_PARAM: &str = "cafile";
30pub(crate) const USE_TLS_PARAM: &str = "use_tls";
31pub(crate) const STRINGS_AS_STRINGS_PARAM: &str = "strings_as_strings";
32pub(crate) const CLOUD_TIMEOUT_PARAM: &str = "cloud_timeout";
33pub(crate) const CLOUD_WAKEUP_PARAM: &str = "cloud_wakeup";
34pub(crate) const POOL_MAX_SIZE_PARAM: &str = "pool_max_size";
35pub(crate) const POOL_MIN_IDLE_PARAM: &str = "pool_min_idle";
36pub(crate) const POOL_TEST_ON_CHECK_OUT_PARAM: &str = "pool_test_on_check_out";
37pub(crate) const POOL_MAX_LIFETIME_PARAM: &str = "pool_max_lifetime";
38pub(crate) const POOL_IDLE_TIMEOUT_PARAM: &str = "pool_idle_timeout";
39pub(crate) const POOL_CONNECTION_TIMEOUT_PARAM: &str = "pool_connection_timeout";
40pub(crate) const POOL_RETRY_CONNECTION_PARAM: &str = "pool_retry_connection";
41
42pub(crate) const ENGINE_PARAM: &str = "engine";
44pub(crate) const ORDER_BY_PARAM: &str = "order_by";
45pub(crate) const PRIMARY_KEYS_PARAM: &str = "primary_keys";
46pub(crate) const PARTITION_BY_PARAM: &str = "partition_by";
47pub(crate) const SAMPLING_PARAM: &str = "sampling";
48pub(crate) const TTL_PARAM: &str = "ttl";
49pub(crate) const DEFAULTS_PARAM: &str = "defaults";
50pub(crate) const DEFAULTS_FOR_NULLABLE_PARAM: &str = "defaults_for_nullable";
51
52pub(crate) const ALL_PARAMS: &[&str; 16] = &[
53 ENDPOINT_PARAM,
54 USERNAME_PARAM,
55 PASSWORD_PARAM,
56 DEFAULT_DATABASE_PARAM,
57 COMPRESSION_PARAM,
58 DOMAIN_PARAM,
59 CAFILE_PARAM,
60 USE_TLS_PARAM,
61 STRINGS_AS_STRINGS_PARAM,
62 CLOUD_TIMEOUT_PARAM,
63 CLOUD_WAKEUP_PARAM,
64 ENGINE_PARAM,
65 ORDER_BY_PARAM,
66 SAMPLING_PARAM,
67 TTL_PARAM,
68 DEFAULTS_FOR_NULLABLE_PARAM,
69];
70
71fn parse_param_vec(param: &str) -> Vec<String> {
73 param.split(',').map(ToString::to_string).collect()
74}
75
76fn parse_param_hashmap(param: &str) -> HashMap<String, String> {
78 let mut params = HashMap::new();
79 for key_value in param.split(',') {
80 let mut parts = key_value.split('=');
81 let key = parts.next();
82 let value = parts.next();
83 if let (Some(k), Some(v)) = (key, value) {
84 drop(params.insert(k.to_string(), v.to_string()));
85 }
86 }
87 params
88}
89
90fn vec_to_param(param: &[String]) -> String { param.join(",") }
92
93#[derive(Debug, Clone, PartialEq, Eq)]
95pub struct ClientOptionParams(HashMap<String, ClientOption>);
96
97impl ClientOptionParams {
98 pub fn into_params(self) -> HashMap<String, String> {
99 self.0.into_iter().map(|(k, v)| (k, v.to_string())).collect()
100 }
101}
102
103impl std::ops::Deref for ClientOptionParams {
104 type Target = HashMap<String, ClientOption>;
105
106 fn deref(&self) -> &Self::Target { &self.0 }
107}
108
109impl std::ops::DerefMut for ClientOptionParams {
110 fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 }
111}
112
113#[derive(Debug, Clone, PartialEq, Eq, Hash)]
116pub enum ClientOption {
117 Secret(Secret),
118 Value(String),
119}
120
121impl std::fmt::Display for ClientOption {
122 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123 match self {
124 Self::Secret(s) => write!(f, "{}", s.get()),
125 Self::Value(s) => write!(f, "{s}"),
126 }
127 }
128}
129
130pub fn pool_builder_to_params(
136 endpoint: impl Into<String>,
137 builder: &ArrowConnectionPoolBuilder,
138) -> Result<ClientOptionParams> {
139 let mut params = [
140 (ENDPOINT_PARAM, ClientOption::Value(endpoint.into())),
141 (USERNAME_PARAM, ClientOption::Value(builder.client_options().username.clone())),
142 (PASSWORD_PARAM, ClientOption::Secret(builder.client_options().password.clone())),
143 (
144 DEFAULT_DATABASE_PARAM,
145 ClientOption::Value(builder.client_options().default_database.clone()),
146 ),
147 (COMPRESSION_PARAM, ClientOption::Value(builder.client_options().compression.to_string())),
148 ]
149 .into_iter()
150 .map(|(k, v)| (k.to_string(), v))
151 .collect::<HashMap<_, _>>();
152
153 if let Some(domain) = builder.client_options().domain.as_ref() {
154 drop(params.insert(DOMAIN_PARAM.into(), ClientOption::Value(domain.clone())));
155 }
156 if let Some(cafile) = builder.client_options().cafile.as_ref() {
157 drop(params.insert(
158 CAFILE_PARAM.into(),
159 ClientOption::Value(cafile.to_string_lossy().to_string()),
160 ));
161 }
162 if builder.client_options().use_tls {
163 drop(params.insert(USE_TLS_PARAM.into(), ClientOption::Value("true".to_string())));
164 }
165 if builder.client_options().ext.arrow.is_some_and(|a| a.strings_as_strings) {
166 drop(
167 params.insert(STRINGS_AS_STRINGS_PARAM.into(), ClientOption::Value("true".to_string())),
168 );
169 }
170
171 #[cfg(feature = "cloud")]
172 if let Some(to) = builder.client_options().ext.cloud.timeout {
173 drop(params.insert(CLOUD_TIMEOUT_PARAM.into(), ClientOption::Value(to.to_string())));
174 }
175
176 #[cfg(feature = "cloud")]
177 if builder.client_options().ext.cloud.wakeup {
178 drop(params.insert(CLOUD_WAKEUP_PARAM.into(), ClientOption::Value("true".to_string())));
179 }
180
181 if let Some(settings) = builder.client_settings() {
183 let settings = settings.encode_to_key_value_strings();
184 for (name, setting) in settings {
185 let previous = params.insert(name, ClientOption::Value(setting));
186 if previous.is_some() {
187 return Err(DataFusionError::External(
188 "Settings cannot include keys used in ClientOptions".into(),
189 ));
190 }
191 }
192 }
193
194 Ok(ClientOptionParams(params))
195}
196
197pub fn params_to_pool_builder<S: ::std::hash::BuildHasher>(
202 endpoint: impl Into<Destination>,
203 params: &mut HashMap<String, String, S>,
204 ignore_settings: bool,
205) -> Result<ArrowConnectionPoolBuilder> {
206 let destination = endpoint.into();
207 let endpoint = destination.to_string();
208
209 let username = params.remove(USERNAME_PARAM).unwrap_or("default".into());
211 let password = params.remove(PASSWORD_PARAM).map(Secret::new).unwrap_or_default();
212
213 drop(params.remove(DEFAULT_DATABASE_PARAM));
215 let default_database = "default";
216
217 let domain = params.remove(DOMAIN_PARAM);
218 let cafile =
219 params.remove(CAFILE_PARAM).map(|c| PathBuf::from_str(&c)).transpose().map_err(|e| {
220 DataFusionError::External(format!("Cannot convert cafile to path: {e}").into())
221 })?;
222 let use_tls = params.remove(USE_TLS_PARAM).is_some_and(|v| v == "true" || v == "1")
223 || endpoint.starts_with("https");
224 let compression = params
225 .remove(COMPRESSION_PARAM)
226 .map(|c| CompressionMethod::from(c.as_str()))
227 .unwrap_or_default();
228 let strings_as_strings = params.remove(STRINGS_AS_STRINGS_PARAM).map(|s| s == "true");
229 let arrow_options = strings_as_strings
230 .map_or(ArrowOptions::default().with_strings_as_strings(true), |s| {
231 ArrowOptions::default().with_strings_as_strings(s)
232 });
233 #[cfg(feature = "cloud")]
234 let cloud_timeout = if let Some(to) = params.remove(CLOUD_TIMEOUT_PARAM) {
235 to.parse::<u64>().ok()
236 } else {
237 None
238 };
239 #[cfg(feature = "cloud")]
240 let cloud_wakeup = params.remove(CLOUD_WAKEUP_PARAM).is_some();
241
242 let pool_max_size = params.remove(POOL_MAX_SIZE_PARAM).and_then(|p| p.parse::<u32>().ok());
244 let pool_min_idle = params.remove(POOL_MIN_IDLE_PARAM).and_then(|p| p.parse::<u32>().ok());
245 let pool_test_on_checkout =
246 params.remove(POOL_TEST_ON_CHECK_OUT_PARAM).is_some_and(|s| s == "true");
247 let pool_max_lifetime =
248 params.remove(POOL_MAX_LIFETIME_PARAM).and_then(|p| p.parse::<u64>().ok());
249 let pool_idle_timeout =
250 params.remove(POOL_IDLE_TIMEOUT_PARAM).and_then(|p| p.parse::<u64>().ok());
251 let pool_connection_timeout =
252 params.remove(POOL_CONNECTION_TIMEOUT_PARAM).and_then(|p| p.parse::<u64>().ok());
253 let pool_retry_connection =
254 params.remove(POOL_RETRY_CONNECTION_PARAM).is_some_and(|p| p == "true");
255
256 let settings = if ignore_settings || params.is_empty() {
258 None
259 } else {
260 let mut settings = Settings::default();
261 for (name, setting) in params.drain() {
262 if !ALL_PARAMS.contains(&name.as_str()) {
263 settings.add_setting(&name, setting);
264 }
265 }
266 Some(settings)
267 };
268
269 let builder = ArrowConnectionPoolBuilder::new(destination)
270 .configure_client(|c| c.with_arrow_options(default_arrow_options()))
271 .configure_client(|c| {
272 let builder = c
273 .with_username(username)
274 .with_password(password)
275 .with_database(default_database)
276 .with_compression(compression)
277 .with_tls(use_tls)
278 .with_arrow_options(arrow_options)
279 .with_settings(settings.unwrap_or_default());
280 #[cfg(feature = "cloud")]
281 let builder = builder.with_cloud_wakeup(cloud_wakeup);
282 #[cfg(feature = "cloud")]
283 let builder =
284 if let Some(to) = cloud_timeout { builder.with_cloud_timeout(to) } else { builder };
285 let builder =
286 if let Some(domain) = domain { builder.with_domain(domain) } else { builder };
287 if let Some(cafile) = cafile { builder.with_cafile(cafile) } else { builder }
288 })
289 .configure_pool(|pool| {
290 let pool = if let Some(max) = pool_max_size { pool.max_size(max) } else { pool };
291 let pool = if let Some(to) = pool_connection_timeout {
292 pool.connection_timeout(Duration::from_millis(to))
293 } else {
294 pool
295 };
296
297 pool.min_idle(pool_min_idle)
298 .test_on_check_out(pool_test_on_checkout)
299 .max_lifetime(pool_max_lifetime.map(Duration::from_millis))
300 .min_idle(pool_min_idle)
301 .idle_timeout(pool_idle_timeout.map(Duration::from_millis))
302 .retry_connection(pool_retry_connection)
303 });
304
305 Ok(builder)
306}
307
308pub fn create_options_to_params(create_options: CreateOptions) -> ClientOptionParams {
311 let params = HashMap::from_iter([
312 (ENGINE_PARAM.into(), ClientOption::Value(create_options.engine)),
313 (ORDER_BY_PARAM.into(), ClientOption::Value(vec_to_param(&create_options.order_by))),
314 (
315 PRIMARY_KEYS_PARAM.into(),
316 ClientOption::Value(vec_to_param(&create_options.primary_keys)),
317 ),
318 (
319 PARTITION_BY_PARAM.into(),
320 ClientOption::Value(create_options.partition_by.unwrap_or_default()),
321 ),
322 (SAMPLING_PARAM.into(), ClientOption::Value(create_options.sampling.unwrap_or_default())),
323 (TTL_PARAM.into(), ClientOption::Value(create_options.ttl.unwrap_or_default())),
324 (
325 DEFAULTS_FOR_NULLABLE_PARAM.into(),
326 ClientOption::Value(
327 if create_options.defaults_for_nullable { "true" } else { "false" }.into(),
328 ),
329 ),
330 ]);
331
332 ClientOptionParams(params)
333}
334
335pub fn params_to_create_options<S: ::std::hash::BuildHasher>(
341 params: &mut HashMap<String, String, S>,
342 column_defaults: &HashMap<String, Expr, S>,
343) -> Result<CreateOptions> {
344 let Some(engine) = params.remove(ENGINE_PARAM) else {
345 return exec_err!("Missing engine for table");
346 };
347
348 let options = CreateOptions::new(&engine)
349 .with_order_by(
350 ¶ms.remove(ORDER_BY_PARAM).map(|p| parse_param_vec(&p)).unwrap_or_default(),
351 )
352 .with_primary_keys(
353 ¶ms.remove(PRIMARY_KEYS_PARAM).map(|p| parse_param_vec(&p)).unwrap_or_default(),
354 )
355 .with_partition_by(params.remove(PARTITION_BY_PARAM).unwrap_or_default())
356 .with_sample_by(params.remove(SAMPLING_PARAM).unwrap_or_default())
357 .with_ttl(params.remove(TTL_PARAM).unwrap_or_default());
358
359 let unparser = Unparser::new(&ClickHouseDialect as &dyn Dialect);
361 let mut defaults = column_defaults
362 .iter()
363 .map(|(col, expr)| {
364 let ast_expr = unparser.expr_to_sql(expr)?;
365 let ch_default = ast_expr_to_clickhouse_default(&ast_expr)?;
366 Ok((col.clone(), ch_default))
367 })
368 .collect::<Result<HashMap<_, _>>>()?;
369
370 if let Some(defs) = params.remove(DEFAULTS_PARAM) {
371 defaults.extend(parse_param_hashmap(&defs));
372 }
373
374 let options =
375 if defaults.is_empty() { options } else { options.with_defaults(defaults.into_iter()) };
376
377 let options = if params.remove(DEFAULTS_FOR_NULLABLE_PARAM).is_some_and(|p| p == "true") {
378 options.with_defaults_for_nullable()
379 } else {
380 options
381 };
382
383 Ok(if params.is_empty() {
384 options
385 } else {
386 let mut settings = Settings::default();
388 for (name, setting) in params.drain() {
389 if !ALL_PARAMS.contains(&name.as_str()) {
390 settings.add_setting(&name, setting);
391 }
392 }
393 options.with_settings(settings)
394 })
395}
396
397pub(crate) fn ast_expr_to_clickhouse_default(expr: &ast::Expr) -> Result<String> {
399 if let ast::Expr::Value(ast::ValueWithSpan { value, .. }) = expr {
400 match value {
401 ast::Value::SingleQuotedString(s) => {
402 if s.starts_with('\'') && s.ends_with('\'') {
403 Ok(s.clone())
404 } else if s.starts_with('"') && s.ends_with('"') {
405 Ok(s.trim_matches('"').to_string())
406 } else {
407 Ok(format!("'{s}'"))
408 }
409 }
410 ast::Value::DoubleQuotedString(s) => Ok(s.clone()),
412 ast::Value::Number(n, _) => Ok(n.clone()),
413 ast::Value::Boolean(b) => Ok(if *b { "1" } else { "0" }.to_string()),
414 ast::Value::Null => Ok("NULL".to_string()),
415 _ => plan_err!("Unsupported default value: {value:?}"),
416 }
417 } else {
418 plan_err!("Unsupported default expression: {expr:?}")
419 }
420}
421
422pub(crate) fn default_str_to_expr(value: &str) -> Expr {
423 let is_quoted = |s: &str| {
424 (s.starts_with('\'') && s.ends_with('\'')) || (s.starts_with('"') && s.ends_with('"'))
425 };
426 match value {
427 "true" => lit(true),
428 "false" => lit(false),
429 s if !is_quoted(s) && s.parse::<i64>().is_ok() => lit(s.parse::<i64>().unwrap()),
430 s if !is_quoted(s) && s.parse::<f64>().is_ok() => lit(s.parse::<f64>().unwrap()),
431 s => lit(s),
432 }
433}
434
435#[cfg(all(test, feature = "test-utils"))]
436mod tests {
437 use std::collections::HashMap;
438
439 use clickhouse_arrow::{ArrowConnectionPoolBuilder, CompressionMethod, Destination};
440
441 use super::*;
442
443 #[test]
444 fn test_parse_param_hashmap_basic() {
445 let result = parse_param_hashmap("key1=value1,key2=value2");
446 let mut expected = HashMap::new();
447 drop(expected.insert("key1".to_string(), "value1".to_string()));
448 drop(expected.insert("key2".to_string(), "value2".to_string()));
449 assert_eq!(result, expected);
450 }
451
452 #[test]
453 fn test_parse_param_hashmap_empty() {
454 let result = parse_param_hashmap("");
455 assert!(result.is_empty());
456 }
457
458 #[test]
459 fn test_parse_param_hashmap_single_pair() {
460 let result = parse_param_hashmap("single=value");
461 let mut expected = HashMap::new();
462 drop(expected.insert("single".to_string(), "value".to_string()));
463 assert_eq!(result, expected);
464 }
465
466 #[test]
467 fn test_parse_param_hashmap_malformed() {
468 let result = parse_param_hashmap("key1=value1,malformed,key2=value2");
470 let mut expected = HashMap::new();
471 drop(expected.insert("key1".to_string(), "value1".to_string()));
472 drop(expected.insert("key2".to_string(), "value2".to_string()));
473 assert_eq!(result, expected);
474 }
475
476 #[test]
477 fn test_parse_param_hashmap_empty_values() {
478 let result = parse_param_hashmap("key1=,key2=value2");
479 let mut expected = HashMap::new();
480 drop(expected.insert("key1".to_string(), String::new()));
481 drop(expected.insert("key2".to_string(), "value2".to_string()));
482 assert_eq!(result, expected);
483 }
484
485 #[test]
486 fn test_pool_builder_to_params_basic() {
487 let destination = Destination::from("http://localhost:8123");
488 let builder = ArrowConnectionPoolBuilder::new(destination)
489 .configure_client(|c| c.with_username("test_user").with_database("test_db"));
490
491 let result = pool_builder_to_params("http://localhost:8123", &builder);
492 assert!(result.is_ok());
493
494 let params = result.unwrap();
495 assert_eq!(params.get(ENDPOINT_PARAM).unwrap().to_string(), "http://localhost:8123");
496 assert_eq!(params.get(USERNAME_PARAM).unwrap().to_string(), "test_user");
497 assert_eq!(params.get(DEFAULT_DATABASE_PARAM).unwrap().to_string(), "test_db");
498 }
499
500 #[test]
501 fn test_pool_builder_to_params_with_password() {
502 let destination = Destination::from("http://localhost:8123");
503 let builder = ArrowConnectionPoolBuilder::new(destination).configure_client(|c| {
504 c.with_username("test_user")
505 .with_password(Secret::new("secret_password"))
506 .with_database("test_db")
507 });
508
509 let result = pool_builder_to_params("http://localhost:8123", &builder);
510 assert!(result.is_ok());
511
512 let params = result.unwrap();
513 assert_eq!(params.get(PASSWORD_PARAM).unwrap().to_string(), "secret_password");
514 }
515
516 #[test]
517 fn test_pool_builder_to_params_with_compression() {
518 let destination = Destination::from("http://localhost:8123");
519 let builder = ArrowConnectionPoolBuilder::new(destination).configure_client(|c| {
520 c.with_username("test_user").with_compression(CompressionMethod::LZ4)
521 });
522
523 let result = pool_builder_to_params("http://localhost:8123", &builder);
524 assert!(result.is_ok());
525
526 let params = result.unwrap();
527 assert_eq!(
528 params.get(COMPRESSION_PARAM).unwrap().to_string(),
529 format!("{}", CompressionMethod::LZ4)
530 );
531 }
532
533 #[test]
534 fn test_pool_builder_to_params_with_tls() {
535 let destination = Destination::from("https://localhost:8443");
536 let builder =
537 ArrowConnectionPoolBuilder::new(destination).configure_client(|c| c.with_tls(true));
538
539 let result = pool_builder_to_params("https://localhost:8443", &builder);
540 assert!(result.is_ok());
541
542 let params = result.unwrap();
543 assert_eq!(params.get(USE_TLS_PARAM).unwrap().to_string(), "true");
544 }
545
546 #[test]
547 fn test_pool_builder_to_params_with_domain() {
548 let destination = Destination::from("http://localhost:8123");
549 let builder = ArrowConnectionPoolBuilder::new(destination)
550 .configure_client(|c| c.with_domain("test.domain.com"));
551
552 let result = pool_builder_to_params("http://localhost:8123", &builder);
553 assert!(result.is_ok());
554
555 let params = result.unwrap();
556 assert_eq!(params.get(DOMAIN_PARAM).unwrap().to_string(), "test.domain.com");
557 }
558
559 #[test]
560 fn test_params_to_pool_builder_basic() {
561 let mut params = HashMap::new();
562 drop(params.insert(USERNAME_PARAM.to_string(), "test_user".to_string()));
563 drop(params.insert(PASSWORD_PARAM.to_string(), "test_password".to_string()));
564 drop(params.insert(DEFAULT_DATABASE_PARAM.to_string(), "test_db".to_string()));
565
566 let destination = Destination::from("http://localhost:8123");
567 let result = params_to_pool_builder(destination, &mut params, false);
568 assert!(result.is_ok());
569
570 let builder = result.unwrap();
571 assert_eq!(builder.client_options().username, "test_user");
572 assert_eq!(builder.client_options().password.get(), "test_password");
573 assert_eq!(builder.client_options().default_database, "default");
575 }
576
577 #[test]
578 fn test_params_to_pool_builder_with_defaults() {
579 let mut params = HashMap::new();
580 let destination = Destination::from("http://localhost:8123");
583 let result = params_to_pool_builder(destination, &mut params, false);
584 assert!(result.is_ok());
585
586 let builder = result.unwrap();
587 assert_eq!(builder.client_options().username, "default");
588 assert_eq!(builder.client_options().password.get(), "");
589 assert_eq!(builder.client_options().default_database, "default");
590 }
591
592 #[test]
593 fn test_params_to_pool_builder_with_compression() {
594 let mut params = HashMap::new();
595 drop(params.insert(COMPRESSION_PARAM.to_string(), "lz4".to_string()));
596
597 let destination = Destination::from("http://localhost:8123");
598 let result = params_to_pool_builder(destination, &mut params, false);
599 assert!(result.is_ok());
600
601 let builder = result.unwrap();
602 assert_eq!(builder.client_options().compression, CompressionMethod::LZ4);
603 }
604
605 #[test]
606 fn test_params_to_pool_builder_with_tls_flag() {
607 let mut params = HashMap::new();
608 drop(params.insert(USE_TLS_PARAM.to_string(), "true".to_string()));
609
610 let destination = Destination::from("http://localhost:8123");
611 let result = params_to_pool_builder(destination, &mut params, false);
612 assert!(result.is_ok());
613
614 let builder = result.unwrap();
615 assert!(builder.client_options().use_tls);
616 }
617
618 #[test]
619 fn test_params_to_pool_builder_with_tls_from_https() {
620 let mut params = HashMap::new();
621 let destination = Destination::from("https://localhost:8443");
624 let result = params_to_pool_builder(destination, &mut params, false);
625 assert!(result.is_ok());
626
627 let builder = result.unwrap();
628 assert!(builder.client_options().use_tls);
629 }
630
631 #[test]
632 fn test_params_to_pool_builder_with_domain() {
633 let mut params = HashMap::new();
634 drop(params.insert(DOMAIN_PARAM.to_string(), "example.com".to_string()));
635
636 let destination = Destination::from("http://localhost:8123");
637 let result = params_to_pool_builder(destination, &mut params, false);
638 assert!(result.is_ok());
639
640 let builder = result.unwrap();
641 assert_eq!(builder.client_options().domain, Some("example.com".to_string()));
642 }
643
644 #[test]
645 fn test_params_to_pool_builder_with_strings_as_strings() {
646 let mut params = HashMap::new();
647 drop(params.insert(STRINGS_AS_STRINGS_PARAM.to_string(), "true".to_string()));
648
649 let destination = Destination::from("http://localhost:8123");
650 let result = params_to_pool_builder(destination, &mut params, false);
651 assert!(result.is_ok());
652
653 let builder = result.unwrap();
654 assert!(builder.client_options().ext.arrow.unwrap().strings_as_strings);
655 }
656
657 #[test]
658 fn test_params_to_pool_builder_with_pool_settings() {
659 let mut params = HashMap::new();
660 drop(params.insert(POOL_MAX_SIZE_PARAM.to_string(), "20".to_string()));
661 drop(params.insert(POOL_MIN_IDLE_PARAM.to_string(), "5".to_string()));
662 drop(params.insert(POOL_TEST_ON_CHECK_OUT_PARAM.to_string(), "true".to_string()));
663
664 let destination = Destination::from("http://localhost:8123");
665 let result = params_to_pool_builder(destination, &mut params, false);
666 assert!(result.is_ok());
667
668 let _builder = result.unwrap();
671 }
672
673 #[test]
674 fn test_params_to_pool_builder_ignore_settings() {
675 let mut params = HashMap::new();
676 drop(params.insert("custom_setting".to_string(), "custom_value".to_string()));
677
678 let destination = Destination::from("http://localhost:8123");
679 let result = params_to_pool_builder(destination, &mut params, true);
680 assert!(result.is_ok());
681
682 let _builder = result.unwrap();
684 assert!(params.contains_key("custom_setting"));
686 }
687
688 #[test]
689 fn test_roundtrip_conversion() {
690 let original_destination = Destination::from("http://localhost:8123");
692 let original_builder = ArrowConnectionPoolBuilder::new(original_destination.clone())
693 .configure_client(|c| {
694 c.with_username("test_user")
695 .with_password(Secret::new("test_password"))
696 .with_database("test_db")
697 .with_compression(CompressionMethod::LZ4)
698 });
699
700 let params_result = pool_builder_to_params("http://localhost:8123", &original_builder);
702 assert!(params_result.is_ok());
703
704 let client_params = params_result.unwrap();
705 let mut string_params = client_params.into_params();
706
707 let builder_result =
709 params_to_pool_builder(original_destination, &mut string_params, false);
710 assert!(builder_result.is_ok());
711
712 let new_builder = builder_result.unwrap();
713
714 assert_eq!(new_builder.client_options().username, "test_user");
716 assert_eq!(new_builder.client_options().password.get(), "test_password");
717 assert_eq!(new_builder.client_options().compression, CompressionMethod::LZ4);
718 }
720
721 #[test]
722 fn test_client_option_display() {
723 let secret_option = ClientOption::Secret(Secret::new("secret_value"));
724 let value_option = ClientOption::Value("plain_value".to_string());
725
726 assert_eq!(secret_option.to_string(), "secret_value");
727 assert_eq!(value_option.to_string(), "plain_value");
728 }
729
730 #[test]
731 fn test_client_option_params_deref() {
732 let mut params = HashMap::new();
733 drop(params.insert("key1".to_string(), ClientOption::Value("value1".to_string())));
734 drop(params.insert("key2".to_string(), ClientOption::Secret(Secret::new("secret"))));
735
736 let client_params = ClientOptionParams(params);
737
738 assert_eq!(client_params.get("key1").unwrap().to_string(), "value1");
740 assert_eq!(client_params.get("key2").unwrap().to_string(), "secret");
741 }
742
743 #[test]
744 fn test_client_option_params_into_params() {
745 let mut params = HashMap::new();
746 drop(params.insert("key1".to_string(), ClientOption::Value("value1".to_string())));
747 drop(params.insert("key2".to_string(), ClientOption::Secret(Secret::new("secret"))));
748
749 let client_params = ClientOptionParams(params);
750 let string_params = client_params.into_params();
751
752 assert_eq!(string_params.get("key1").unwrap(), "value1");
753 assert_eq!(string_params.get("key2").unwrap(), "secret");
754 }
755}