clickhouse_datafusion/utils/
params.rs

1use std::collections::HashMap;
2use std::path::PathBuf;
3use std::str::FromStr;
4use std::time::Duration;
5
6use clickhouse_arrow::prelude::Secret;
7use clickhouse_arrow::{
8    ArrowConnectionPoolBuilder, ArrowOptions, CompressionMethod, CreateOptions, Destination,
9    Settings,
10};
11use datafusion::common::{exec_err, plan_err};
12use datafusion::error::{DataFusionError, Result};
13use datafusion::logical_expr::Expr;
14use datafusion::prelude::lit;
15use datafusion::sql::sqlparser::ast;
16use datafusion::sql::unparser::Unparser;
17use datafusion::sql::unparser::dialect::Dialect;
18
19use crate::default_arrow_options;
20use crate::dialect::ClickHouseDialect;
21
22/// Reserved clickhouse options parameter settings
23pub(crate) const ENDPOINT_PARAM: &str = "endpoint";
24pub(crate) const USERNAME_PARAM: &str = "username";
25pub(crate) const PASSWORD_PARAM: &str = "password";
26pub(crate) const DEFAULT_DATABASE_PARAM: &str = "default_database";
27pub(crate) const COMPRESSION_PARAM: &str = "compression";
28pub(crate) const DOMAIN_PARAM: &str = "domain";
29pub(crate) const CAFILE_PARAM: &str = "cafile";
30pub(crate) const USE_TLS_PARAM: &str = "use_tls";
31pub(crate) const STRINGS_AS_STRINGS_PARAM: &str = "strings_as_strings";
32pub(crate) const CLOUD_TIMEOUT_PARAM: &str = "cloud_timeout";
33pub(crate) const CLOUD_WAKEUP_PARAM: &str = "cloud_wakeup";
34pub(crate) const POOL_MAX_SIZE_PARAM: &str = "pool_max_size";
35pub(crate) const POOL_MIN_IDLE_PARAM: &str = "pool_min_idle";
36pub(crate) const POOL_TEST_ON_CHECK_OUT_PARAM: &str = "pool_test_on_check_out";
37pub(crate) const POOL_MAX_LIFETIME_PARAM: &str = "pool_max_lifetime";
38pub(crate) const POOL_IDLE_TIMEOUT_PARAM: &str = "pool_idle_timeout";
39pub(crate) const POOL_CONNECTION_TIMEOUT_PARAM: &str = "pool_connection_timeout";
40pub(crate) const POOL_RETRY_CONNECTION_PARAM: &str = "pool_retry_connection";
41
42/// Reserved table create parameter settings
43pub(crate) const ENGINE_PARAM: &str = "engine";
44pub(crate) const ORDER_BY_PARAM: &str = "order_by";
45pub(crate) const PRIMARY_KEYS_PARAM: &str = "primary_keys";
46pub(crate) const PARTITION_BY_PARAM: &str = "partition_by";
47pub(crate) const SAMPLING_PARAM: &str = "sampling";
48pub(crate) const TTL_PARAM: &str = "ttl";
49pub(crate) const DEFAULTS_PARAM: &str = "defaults";
50pub(crate) const DEFAULTS_FOR_NULLABLE_PARAM: &str = "defaults_for_nullable";
51
52pub(crate) const ALL_PARAMS: &[&str; 16] = &[
53    ENDPOINT_PARAM,
54    USERNAME_PARAM,
55    PASSWORD_PARAM,
56    DEFAULT_DATABASE_PARAM,
57    COMPRESSION_PARAM,
58    DOMAIN_PARAM,
59    CAFILE_PARAM,
60    USE_TLS_PARAM,
61    STRINGS_AS_STRINGS_PARAM,
62    CLOUD_TIMEOUT_PARAM,
63    CLOUD_WAKEUP_PARAM,
64    ENGINE_PARAM,
65    ORDER_BY_PARAM,
66    SAMPLING_PARAM,
67    TTL_PARAM,
68    DEFAULTS_FOR_NULLABLE_PARAM,
69];
70
71/// Helper function to parse a string into a vector of strings
72fn parse_param_vec(param: &str) -> Vec<String> {
73    param.split(',').map(ToString::to_string).collect()
74}
75
76/// Helper function to parse a string into a hashmap of strings -> strings
77fn parse_param_hashmap(param: &str) -> HashMap<String, String> {
78    let mut params = HashMap::new();
79    for key_value in param.split(',') {
80        let mut parts = key_value.split('=');
81        let key = parts.next();
82        let value = parts.next();
83        if let (Some(k), Some(v)) = (key, value) {
84            drop(params.insert(k.to_string(), v.to_string()));
85        }
86    }
87    params
88}
89
90/// Helper function to convert a vec of strings into a string param
91fn vec_to_param(param: &[String]) -> String { param.join(",") }
92
93/// Wrapper for serialized client options
94#[derive(Debug, Clone, PartialEq, Eq)]
95pub struct ClientOptionParams(HashMap<String, ClientOption>);
96
97impl ClientOptionParams {
98    pub fn into_params(self) -> HashMap<String, String> {
99        self.0.into_iter().map(|(k, v)| (k, v.to_string())).collect()
100    }
101}
102
103impl std::ops::Deref for ClientOptionParams {
104    type Target = HashMap<String, ClientOption>;
105
106    fn deref(&self) -> &Self::Target { &self.0 }
107}
108
109impl std::ops::DerefMut for ClientOptionParams {
110    fn deref_mut(&mut self) -> &mut Self::Target { &mut self.0 }
111}
112
113/// Wrapper for serialized client options that impls [`std::fmt::Display`] and ensures secrets are
114/// not logged or deserialized in plain text.
115#[derive(Debug, Clone, PartialEq, Eq, Hash)]
116pub enum ClientOption {
117    Secret(Secret),
118    Value(String),
119}
120
121impl std::fmt::Display for ClientOption {
122    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
123        match self {
124            Self::Secret(s) => write!(f, "{}", s.get()),
125            Self::Value(s) => write!(f, "{s}"),
126        }
127    }
128}
129
130/// [`crate::ClickHouseTableProviderFactory`] must receive all parameters as strings, this
131/// function is helpful to serialize the required options
132///
133/// # Errors
134/// - Returns an error if settings contain `ClientOption` keys
135pub fn pool_builder_to_params(
136    endpoint: impl Into<String>,
137    builder: &ArrowConnectionPoolBuilder,
138) -> Result<ClientOptionParams> {
139    let mut params = [
140        (ENDPOINT_PARAM, ClientOption::Value(endpoint.into())),
141        (USERNAME_PARAM, ClientOption::Value(builder.client_options().username.clone())),
142        (PASSWORD_PARAM, ClientOption::Secret(builder.client_options().password.clone())),
143        (
144            DEFAULT_DATABASE_PARAM,
145            ClientOption::Value(builder.client_options().default_database.clone()),
146        ),
147        (COMPRESSION_PARAM, ClientOption::Value(builder.client_options().compression.to_string())),
148    ]
149    .into_iter()
150    .map(|(k, v)| (k.to_string(), v))
151    .collect::<HashMap<_, _>>();
152
153    if let Some(domain) = builder.client_options().domain.as_ref() {
154        drop(params.insert(DOMAIN_PARAM.into(), ClientOption::Value(domain.clone())));
155    }
156    if let Some(cafile) = builder.client_options().cafile.as_ref() {
157        drop(params.insert(
158            CAFILE_PARAM.into(),
159            ClientOption::Value(cafile.to_string_lossy().to_string()),
160        ));
161    }
162    if builder.client_options().use_tls {
163        drop(params.insert(USE_TLS_PARAM.into(), ClientOption::Value("true".to_string())));
164    }
165    if builder.client_options().ext.arrow.is_some_and(|a| a.strings_as_strings) {
166        drop(
167            params.insert(STRINGS_AS_STRINGS_PARAM.into(), ClientOption::Value("true".to_string())),
168        );
169    }
170
171    #[cfg(feature = "cloud")]
172    if let Some(to) = builder.client_options().ext.cloud.timeout {
173        drop(params.insert(CLOUD_TIMEOUT_PARAM.into(), ClientOption::Value(to.to_string())));
174    }
175
176    #[cfg(feature = "cloud")]
177    if builder.client_options().ext.cloud.wakeup {
178        drop(params.insert(CLOUD_WAKEUP_PARAM.into(), ClientOption::Value("true".to_string())));
179    }
180
181    // Settings
182    if let Some(settings) = builder.client_settings() {
183        let settings = settings.encode_to_key_value_strings();
184        for (name, setting) in settings {
185            let previous = params.insert(name, ClientOption::Value(setting));
186            if previous.is_some() {
187                return Err(DataFusionError::External(
188                    "Settings cannot include keys used in ClientOptions".into(),
189                ));
190            }
191        }
192    }
193
194    Ok(ClientOptionParams(params))
195}
196
197/// Converts a `HashMap` of parameters to an `ArrowConnectionPoolBuilder`.
198///
199/// # Errors
200/// - Returns an error if the parameters are invalid.
201pub fn params_to_pool_builder<S: ::std::hash::BuildHasher>(
202    endpoint: impl Into<Destination>,
203    params: &mut HashMap<String, String, S>,
204    ignore_settings: bool,
205) -> Result<ArrowConnectionPoolBuilder> {
206    let destination = endpoint.into();
207    let endpoint = destination.to_string();
208
209    // ClientOptions
210    let username = params.remove(USERNAME_PARAM).unwrap_or("default".into());
211    let password = params.remove(PASSWORD_PARAM).map(Secret::new).unwrap_or_default();
212
213    // This is set to "default" since datafusion drives the schema. DDL's don't work otherwise
214    drop(params.remove(DEFAULT_DATABASE_PARAM));
215    let default_database = "default";
216
217    let domain = params.remove(DOMAIN_PARAM);
218    let cafile =
219        params.remove(CAFILE_PARAM).map(|c| PathBuf::from_str(&c)).transpose().map_err(|e| {
220            DataFusionError::External(format!("Cannot convert cafile to path: {e}").into())
221        })?;
222    let use_tls = params.remove(USE_TLS_PARAM).is_some_and(|v| v == "true" || v == "1")
223        || endpoint.starts_with("https");
224    let compression = params
225        .remove(COMPRESSION_PARAM)
226        .map(|c| CompressionMethod::from(c.as_str()))
227        .unwrap_or_default();
228    let strings_as_strings = params.remove(STRINGS_AS_STRINGS_PARAM).map(|s| s == "true");
229    let arrow_options = strings_as_strings
230        .map_or(ArrowOptions::default().with_strings_as_strings(true), |s| {
231            ArrowOptions::default().with_strings_as_strings(s)
232        });
233    #[cfg(feature = "cloud")]
234    let cloud_timeout = if let Some(to) = params.remove(CLOUD_TIMEOUT_PARAM) {
235        to.parse::<u64>().ok()
236    } else {
237        None
238    };
239    #[cfg(feature = "cloud")]
240    let cloud_wakeup = params.remove(CLOUD_WAKEUP_PARAM).is_some();
241
242    // Pool settings
243    let pool_max_size = params.remove(POOL_MAX_SIZE_PARAM).and_then(|p| p.parse::<u32>().ok());
244    let pool_min_idle = params.remove(POOL_MIN_IDLE_PARAM).and_then(|p| p.parse::<u32>().ok());
245    let pool_test_on_checkout =
246        params.remove(POOL_TEST_ON_CHECK_OUT_PARAM).is_some_and(|s| s == "true");
247    let pool_max_lifetime =
248        params.remove(POOL_MAX_LIFETIME_PARAM).and_then(|p| p.parse::<u64>().ok());
249    let pool_idle_timeout =
250        params.remove(POOL_IDLE_TIMEOUT_PARAM).and_then(|p| p.parse::<u64>().ok());
251    let pool_connection_timeout =
252        params.remove(POOL_CONNECTION_TIMEOUT_PARAM).and_then(|p| p.parse::<u64>().ok());
253    let pool_retry_connection =
254        params.remove(POOL_RETRY_CONNECTION_PARAM).is_some_and(|p| p == "true");
255
256    // Settings
257    let settings = if ignore_settings || params.is_empty() {
258        None
259    } else {
260        let mut settings = Settings::default();
261        for (name, setting) in params.drain() {
262            if !ALL_PARAMS.contains(&name.as_str()) {
263                settings.add_setting(&name, setting);
264            }
265        }
266        Some(settings)
267    };
268
269    let builder = ArrowConnectionPoolBuilder::new(destination)
270        .configure_client(|c| c.with_arrow_options(default_arrow_options()))
271        .configure_client(|c| {
272            let builder = c
273                .with_username(username)
274                .with_password(password)
275                .with_database(default_database)
276                .with_compression(compression)
277                .with_tls(use_tls)
278                .with_arrow_options(arrow_options)
279                .with_settings(settings.unwrap_or_default());
280            #[cfg(feature = "cloud")]
281            let builder = builder.with_cloud_wakeup(cloud_wakeup);
282            #[cfg(feature = "cloud")]
283            let builder =
284                if let Some(to) = cloud_timeout { builder.with_cloud_timeout(to) } else { builder };
285            let builder =
286                if let Some(domain) = domain { builder.with_domain(domain) } else { builder };
287            if let Some(cafile) = cafile { builder.with_cafile(cafile) } else { builder }
288        })
289        .configure_pool(|pool| {
290            let pool = if let Some(max) = pool_max_size { pool.max_size(max) } else { pool };
291            let pool = if let Some(to) = pool_connection_timeout {
292                pool.connection_timeout(Duration::from_millis(to))
293            } else {
294                pool
295            };
296
297            pool.min_idle(pool_min_idle)
298                .test_on_check_out(pool_test_on_checkout)
299                .max_lifetime(pool_max_lifetime.map(Duration::from_millis))
300                .min_idle(pool_min_idle)
301                .idle_timeout(pool_idle_timeout.map(Duration::from_millis))
302                .retry_connection(pool_retry_connection)
303        });
304
305    Ok(builder)
306}
307
308/// [`crate::ClickHouseTableProviderFactory`] must receive all parameters as strings, this
309/// function is helpful to serialize the required options
310pub fn create_options_to_params(create_options: CreateOptions) -> ClientOptionParams {
311    let params = HashMap::from_iter([
312        (ENGINE_PARAM.into(), ClientOption::Value(create_options.engine)),
313        (ORDER_BY_PARAM.into(), ClientOption::Value(vec_to_param(&create_options.order_by))),
314        (
315            PRIMARY_KEYS_PARAM.into(),
316            ClientOption::Value(vec_to_param(&create_options.primary_keys)),
317        ),
318        (
319            PARTITION_BY_PARAM.into(),
320            ClientOption::Value(create_options.partition_by.unwrap_or_default()),
321        ),
322        (SAMPLING_PARAM.into(), ClientOption::Value(create_options.sampling.unwrap_or_default())),
323        (TTL_PARAM.into(), ClientOption::Value(create_options.ttl.unwrap_or_default())),
324        (
325            DEFAULTS_FOR_NULLABLE_PARAM.into(),
326            ClientOption::Value(
327                if create_options.defaults_for_nullable { "true" } else { "false" }.into(),
328            ),
329        ),
330    ]);
331
332    ClientOptionParams(params)
333}
334
335/// Creates a `CreateOptions` from 'params' (`HashMap<String, String>`) and 'defaults'
336/// (`HashMap<String, Expr>`).
337///
338/// # Errors
339/// - Returns an error if the engine is missing.
340pub fn params_to_create_options<S: ::std::hash::BuildHasher>(
341    params: &mut HashMap<String, String, S>,
342    column_defaults: &HashMap<String, Expr, S>,
343) -> Result<CreateOptions> {
344    let Some(engine) = params.remove(ENGINE_PARAM) else {
345        return exec_err!("Missing engine for table");
346    };
347
348    let options = CreateOptions::new(&engine)
349        .with_order_by(
350            &params.remove(ORDER_BY_PARAM).map(|p| parse_param_vec(&p)).unwrap_or_default(),
351        )
352        .with_primary_keys(
353            &params.remove(PRIMARY_KEYS_PARAM).map(|p| parse_param_vec(&p)).unwrap_or_default(),
354        )
355        .with_partition_by(params.remove(PARTITION_BY_PARAM).unwrap_or_default())
356        .with_sample_by(params.remove(SAMPLING_PARAM).unwrap_or_default())
357        .with_ttl(params.remove(TTL_PARAM).unwrap_or_default());
358
359    // Convert column_defaults to ClickHouse defaults
360    let unparser = Unparser::new(&ClickHouseDialect as &dyn Dialect);
361    let mut defaults = column_defaults
362        .iter()
363        .map(|(col, expr)| {
364            let ast_expr = unparser.expr_to_sql(expr)?;
365            let ch_default = ast_expr_to_clickhouse_default(&ast_expr)?;
366            Ok((col.clone(), ch_default))
367        })
368        .collect::<Result<HashMap<_, _>>>()?;
369
370    if let Some(defs) = params.remove(DEFAULTS_PARAM) {
371        defaults.extend(parse_param_hashmap(&defs));
372    }
373
374    let options =
375        if defaults.is_empty() { options } else { options.with_defaults(defaults.into_iter()) };
376
377    let options = if params.remove(DEFAULTS_FOR_NULLABLE_PARAM).is_some_and(|p| p == "true") {
378        options.with_defaults_for_nullable()
379    } else {
380        options
381    };
382
383    Ok(if params.is_empty() {
384        options
385    } else {
386        // Settings
387        let mut settings = Settings::default();
388        for (name, setting) in params.drain() {
389            if !ALL_PARAMS.contains(&name.as_str()) {
390                settings.add_setting(&name, setting);
391            }
392        }
393        options.with_settings(settings)
394    })
395}
396
397// Convert ast::Expr to ClickHouse default string
398pub(crate) fn ast_expr_to_clickhouse_default(expr: &ast::Expr) -> Result<String> {
399    if let ast::Expr::Value(ast::ValueWithSpan { value, .. }) = expr {
400        match value {
401            ast::Value::SingleQuotedString(s) => {
402                if s.starts_with('\'') && s.ends_with('\'') {
403                    Ok(s.clone())
404                } else if s.starts_with('"') && s.ends_with('"') {
405                    Ok(s.trim_matches('"').to_string())
406                } else {
407                    Ok(format!("'{s}'"))
408                }
409            }
410            // DoubleQuotedString is used to signify do not alter
411            ast::Value::DoubleQuotedString(s) => Ok(s.clone()),
412            ast::Value::Number(n, _) => Ok(n.clone()),
413            ast::Value::Boolean(b) => Ok(if *b { "1" } else { "0" }.to_string()),
414            ast::Value::Null => Ok("NULL".to_string()),
415            _ => plan_err!("Unsupported default value: {value:?}"),
416        }
417    } else {
418        plan_err!("Unsupported default expression: {expr:?}")
419    }
420}
421
422pub(crate) fn default_str_to_expr(value: &str) -> Expr {
423    let is_quoted = |s: &str| {
424        (s.starts_with('\'') && s.ends_with('\'')) || (s.starts_with('"') && s.ends_with('"'))
425    };
426    match value {
427        "true" => lit(true),
428        "false" => lit(false),
429        s if !is_quoted(s) && s.parse::<i64>().is_ok() => lit(s.parse::<i64>().unwrap()),
430        s if !is_quoted(s) && s.parse::<f64>().is_ok() => lit(s.parse::<f64>().unwrap()),
431        s => lit(s),
432    }
433}
434
435#[cfg(all(test, feature = "test-utils"))]
436mod tests {
437    use std::collections::HashMap;
438
439    use clickhouse_arrow::{ArrowConnectionPoolBuilder, CompressionMethod, Destination};
440
441    use super::*;
442
443    #[test]
444    fn test_parse_param_hashmap_basic() {
445        let result = parse_param_hashmap("key1=value1,key2=value2");
446        let mut expected = HashMap::new();
447        drop(expected.insert("key1".to_string(), "value1".to_string()));
448        drop(expected.insert("key2".to_string(), "value2".to_string()));
449        assert_eq!(result, expected);
450    }
451
452    #[test]
453    fn test_parse_param_hashmap_empty() {
454        let result = parse_param_hashmap("");
455        assert!(result.is_empty());
456    }
457
458    #[test]
459    fn test_parse_param_hashmap_single_pair() {
460        let result = parse_param_hashmap("single=value");
461        let mut expected = HashMap::new();
462        drop(expected.insert("single".to_string(), "value".to_string()));
463        assert_eq!(result, expected);
464    }
465
466    #[test]
467    fn test_parse_param_hashmap_malformed() {
468        // Test with malformed entries (no equals sign)
469        let result = parse_param_hashmap("key1=value1,malformed,key2=value2");
470        let mut expected = HashMap::new();
471        drop(expected.insert("key1".to_string(), "value1".to_string()));
472        drop(expected.insert("key2".to_string(), "value2".to_string()));
473        assert_eq!(result, expected);
474    }
475
476    #[test]
477    fn test_parse_param_hashmap_empty_values() {
478        let result = parse_param_hashmap("key1=,key2=value2");
479        let mut expected = HashMap::new();
480        drop(expected.insert("key1".to_string(), String::new()));
481        drop(expected.insert("key2".to_string(), "value2".to_string()));
482        assert_eq!(result, expected);
483    }
484
485    #[test]
486    fn test_pool_builder_to_params_basic() {
487        let destination = Destination::from("http://localhost:8123");
488        let builder = ArrowConnectionPoolBuilder::new(destination)
489            .configure_client(|c| c.with_username("test_user").with_database("test_db"));
490
491        let result = pool_builder_to_params("http://localhost:8123", &builder);
492        assert!(result.is_ok());
493
494        let params = result.unwrap();
495        assert_eq!(params.get(ENDPOINT_PARAM).unwrap().to_string(), "http://localhost:8123");
496        assert_eq!(params.get(USERNAME_PARAM).unwrap().to_string(), "test_user");
497        assert_eq!(params.get(DEFAULT_DATABASE_PARAM).unwrap().to_string(), "test_db");
498    }
499
500    #[test]
501    fn test_pool_builder_to_params_with_password() {
502        let destination = Destination::from("http://localhost:8123");
503        let builder = ArrowConnectionPoolBuilder::new(destination).configure_client(|c| {
504            c.with_username("test_user")
505                .with_password(Secret::new("secret_password"))
506                .with_database("test_db")
507        });
508
509        let result = pool_builder_to_params("http://localhost:8123", &builder);
510        assert!(result.is_ok());
511
512        let params = result.unwrap();
513        assert_eq!(params.get(PASSWORD_PARAM).unwrap().to_string(), "secret_password");
514    }
515
516    #[test]
517    fn test_pool_builder_to_params_with_compression() {
518        let destination = Destination::from("http://localhost:8123");
519        let builder = ArrowConnectionPoolBuilder::new(destination).configure_client(|c| {
520            c.with_username("test_user").with_compression(CompressionMethod::LZ4)
521        });
522
523        let result = pool_builder_to_params("http://localhost:8123", &builder);
524        assert!(result.is_ok());
525
526        let params = result.unwrap();
527        assert_eq!(
528            params.get(COMPRESSION_PARAM).unwrap().to_string(),
529            format!("{}", CompressionMethod::LZ4)
530        );
531    }
532
533    #[test]
534    fn test_pool_builder_to_params_with_tls() {
535        let destination = Destination::from("https://localhost:8443");
536        let builder =
537            ArrowConnectionPoolBuilder::new(destination).configure_client(|c| c.with_tls(true));
538
539        let result = pool_builder_to_params("https://localhost:8443", &builder);
540        assert!(result.is_ok());
541
542        let params = result.unwrap();
543        assert_eq!(params.get(USE_TLS_PARAM).unwrap().to_string(), "true");
544    }
545
546    #[test]
547    fn test_pool_builder_to_params_with_domain() {
548        let destination = Destination::from("http://localhost:8123");
549        let builder = ArrowConnectionPoolBuilder::new(destination)
550            .configure_client(|c| c.with_domain("test.domain.com"));
551
552        let result = pool_builder_to_params("http://localhost:8123", &builder);
553        assert!(result.is_ok());
554
555        let params = result.unwrap();
556        assert_eq!(params.get(DOMAIN_PARAM).unwrap().to_string(), "test.domain.com");
557    }
558
559    #[test]
560    fn test_params_to_pool_builder_basic() {
561        let mut params = HashMap::new();
562        drop(params.insert(USERNAME_PARAM.to_string(), "test_user".to_string()));
563        drop(params.insert(PASSWORD_PARAM.to_string(), "test_password".to_string()));
564        drop(params.insert(DEFAULT_DATABASE_PARAM.to_string(), "test_db".to_string()));
565
566        let destination = Destination::from("http://localhost:8123");
567        let result = params_to_pool_builder(destination, &mut params, false);
568        assert!(result.is_ok());
569
570        let builder = result.unwrap();
571        assert_eq!(builder.client_options().username, "test_user");
572        assert_eq!(builder.client_options().password.get(), "test_password");
573        // Note: default_database is always set to "default" regardless of input
574        assert_eq!(builder.client_options().default_database, "default");
575    }
576
577    #[test]
578    fn test_params_to_pool_builder_with_defaults() {
579        let mut params = HashMap::new();
580        // Don't provide username/password to test defaults
581
582        let destination = Destination::from("http://localhost:8123");
583        let result = params_to_pool_builder(destination, &mut params, false);
584        assert!(result.is_ok());
585
586        let builder = result.unwrap();
587        assert_eq!(builder.client_options().username, "default");
588        assert_eq!(builder.client_options().password.get(), "");
589        assert_eq!(builder.client_options().default_database, "default");
590    }
591
592    #[test]
593    fn test_params_to_pool_builder_with_compression() {
594        let mut params = HashMap::new();
595        drop(params.insert(COMPRESSION_PARAM.to_string(), "lz4".to_string()));
596
597        let destination = Destination::from("http://localhost:8123");
598        let result = params_to_pool_builder(destination, &mut params, false);
599        assert!(result.is_ok());
600
601        let builder = result.unwrap();
602        assert_eq!(builder.client_options().compression, CompressionMethod::LZ4);
603    }
604
605    #[test]
606    fn test_params_to_pool_builder_with_tls_flag() {
607        let mut params = HashMap::new();
608        drop(params.insert(USE_TLS_PARAM.to_string(), "true".to_string()));
609
610        let destination = Destination::from("http://localhost:8123");
611        let result = params_to_pool_builder(destination, &mut params, false);
612        assert!(result.is_ok());
613
614        let builder = result.unwrap();
615        assert!(builder.client_options().use_tls);
616    }
617
618    #[test]
619    fn test_params_to_pool_builder_with_tls_from_https() {
620        let mut params = HashMap::new();
621        // No explicit USE_TLS_PARAM, but https endpoint should enable TLS
622
623        let destination = Destination::from("https://localhost:8443");
624        let result = params_to_pool_builder(destination, &mut params, false);
625        assert!(result.is_ok());
626
627        let builder = result.unwrap();
628        assert!(builder.client_options().use_tls);
629    }
630
631    #[test]
632    fn test_params_to_pool_builder_with_domain() {
633        let mut params = HashMap::new();
634        drop(params.insert(DOMAIN_PARAM.to_string(), "example.com".to_string()));
635
636        let destination = Destination::from("http://localhost:8123");
637        let result = params_to_pool_builder(destination, &mut params, false);
638        assert!(result.is_ok());
639
640        let builder = result.unwrap();
641        assert_eq!(builder.client_options().domain, Some("example.com".to_string()));
642    }
643
644    #[test]
645    fn test_params_to_pool_builder_with_strings_as_strings() {
646        let mut params = HashMap::new();
647        drop(params.insert(STRINGS_AS_STRINGS_PARAM.to_string(), "true".to_string()));
648
649        let destination = Destination::from("http://localhost:8123");
650        let result = params_to_pool_builder(destination, &mut params, false);
651        assert!(result.is_ok());
652
653        let builder = result.unwrap();
654        assert!(builder.client_options().ext.arrow.unwrap().strings_as_strings);
655    }
656
657    #[test]
658    fn test_params_to_pool_builder_with_pool_settings() {
659        let mut params = HashMap::new();
660        drop(params.insert(POOL_MAX_SIZE_PARAM.to_string(), "20".to_string()));
661        drop(params.insert(POOL_MIN_IDLE_PARAM.to_string(), "5".to_string()));
662        drop(params.insert(POOL_TEST_ON_CHECK_OUT_PARAM.to_string(), "true".to_string()));
663
664        let destination = Destination::from("http://localhost:8123");
665        let result = params_to_pool_builder(destination, &mut params, false);
666        assert!(result.is_ok());
667
668        // The pool settings are applied via configure_pool, so we can't directly test them
669        // but we can verify the builder was created successfully
670        let _builder = result.unwrap();
671    }
672
673    #[test]
674    fn test_params_to_pool_builder_ignore_settings() {
675        let mut params = HashMap::new();
676        drop(params.insert("custom_setting".to_string(), "custom_value".to_string()));
677
678        let destination = Destination::from("http://localhost:8123");
679        let result = params_to_pool_builder(destination, &mut params, true);
680        assert!(result.is_ok());
681
682        // When ignore_settings is true, custom settings should be ignored
683        let _builder = result.unwrap();
684        // The params HashMap should still contain the custom setting since it's ignored
685        assert!(params.contains_key("custom_setting"));
686    }
687
688    #[test]
689    fn test_roundtrip_conversion() {
690        // Test that we can convert builder -> params -> builder
691        let original_destination = Destination::from("http://localhost:8123");
692        let original_builder = ArrowConnectionPoolBuilder::new(original_destination.clone())
693            .configure_client(|c| {
694                c.with_username("test_user")
695                    .with_password(Secret::new("test_password"))
696                    .with_database("test_db")
697                    .with_compression(CompressionMethod::LZ4)
698            });
699
700        // Convert to params
701        let params_result = pool_builder_to_params("http://localhost:8123", &original_builder);
702        assert!(params_result.is_ok());
703
704        let client_params = params_result.unwrap();
705        let mut string_params = client_params.into_params();
706
707        // Convert back to builder
708        let builder_result =
709            params_to_pool_builder(original_destination, &mut string_params, false);
710        assert!(builder_result.is_ok());
711
712        let new_builder = builder_result.unwrap();
713
714        // Verify key properties match
715        assert_eq!(new_builder.client_options().username, "test_user");
716        assert_eq!(new_builder.client_options().password.get(), "test_password");
717        assert_eq!(new_builder.client_options().compression, CompressionMethod::LZ4);
718        // Note: database will be "default" due to the forced override in params_to_pool_builder
719    }
720
721    #[test]
722    fn test_client_option_display() {
723        let secret_option = ClientOption::Secret(Secret::new("secret_value"));
724        let value_option = ClientOption::Value("plain_value".to_string());
725
726        assert_eq!(secret_option.to_string(), "secret_value");
727        assert_eq!(value_option.to_string(), "plain_value");
728    }
729
730    #[test]
731    fn test_client_option_params_deref() {
732        let mut params = HashMap::new();
733        drop(params.insert("key1".to_string(), ClientOption::Value("value1".to_string())));
734        drop(params.insert("key2".to_string(), ClientOption::Secret(Secret::new("secret"))));
735
736        let client_params = ClientOptionParams(params);
737
738        // Test Deref trait
739        assert_eq!(client_params.get("key1").unwrap().to_string(), "value1");
740        assert_eq!(client_params.get("key2").unwrap().to_string(), "secret");
741    }
742
743    #[test]
744    fn test_client_option_params_into_params() {
745        let mut params = HashMap::new();
746        drop(params.insert("key1".to_string(), ClientOption::Value("value1".to_string())));
747        drop(params.insert("key2".to_string(), ClientOption::Secret(Secret::new("secret"))));
748
749        let client_params = ClientOptionParams(params);
750        let string_params = client_params.into_params();
751
752        assert_eq!(string_params.get("key1").unwrap(), "value1");
753        assert_eq!(string_params.get("key2").unwrap(), "secret");
754    }
755}