Skip to main content

clickhouse_connection_pool/
config.rs

1use serde_derive::{Deserialize, Serialize};
2use std::{sync::Arc, time::Duration};
3use url::Url;
4
5pub const CONNECTION_TIMEOUT_SECONDS_DEFAULT: u64 = 3;
6pub const QUERY_TIMEOUT_SECONDS_DEFAULT: u64 = 10;
7pub const POOL_MAX_CONNECTIONS_DEFAULT: u32 = 256;
8pub const PORT_DEFAULT: u16 = 9000;
9
10pub const MAX_RETRIES_DEFAULT: u32 = 5;
11pub const INITIAL_BACKOFF_MS_DEFAULT: u64 = 100;
12pub const MAX_BACKOFF_MS_DEFAULT: u64 = 10_000;
13pub const BACKOFF_MULTIPLIER_DEFAULT: f64 = 2.0;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct ClickhouseConfig {
17    /// Database host
18    pub host: String,
19
20    /// Database port
21    pub port: u16,
22
23    /// Database name
24    pub database: String,
25
26    /// Username for authentication
27    pub username: String,
28
29    /// Password for authentication
30    pub password: String,
31
32    /// Connection timeout in seconds
33    #[serde(default)]
34    pub connect_timeout_seconds: u64,
35
36    /// Query timeout in seconds
37    #[serde(default)]
38    pub query_timeout_seconds: u64,
39
40    /// Max. number of connections in the pool
41    #[serde(default)]
42    pub max_connections: u32,
43}
44
45impl Default for ClickhouseConfig {
46    fn default() -> Self {
47        Self {
48            host: "localhost".to_string(),
49            port: PORT_DEFAULT,
50            database: "default".to_string(),
51            username: "default".to_string(),
52            password: String::new(),
53            connect_timeout_seconds: CONNECTION_TIMEOUT_SECONDS_DEFAULT,
54            query_timeout_seconds: QUERY_TIMEOUT_SECONDS_DEFAULT,
55            max_connections: POOL_MAX_CONNECTIONS_DEFAULT,
56        }
57    }
58}
59
60impl ClickhouseConfig {
61    pub fn new(
62        host: String,
63        port: u16,
64        database: String,
65        username: String,
66        password: String,
67        connect_timeout_seconds: u64,
68        query_timeout_seconds: u64,
69        max_connections: u32,
70    ) -> Self {
71        Self {
72            host,
73            port,
74            database,
75            username,
76            password,
77            connect_timeout_seconds,
78            query_timeout_seconds,
79            max_connections,
80        }
81    }
82
83    pub fn connection_url(&self) -> Url {
84        let url_str = format!("https://{}:{}", self.host, self.port,);
85
86        Url::parse(&url_str).expect("Failed to parse Clickhouse URL")
87    }
88
89    pub fn authenticated_connection_url(&self) -> Url {
90        let url_str = format!(
91            "https://{}:{}@{}:{}",
92            self.username, self.password, self.host, self.port,
93        );
94
95        Url::parse(&url_str).expect("Failed to parse authenticated Clickhouse URL")
96    }
97
98    pub fn connect_timeout(&self) -> Duration {
99        Duration::from_secs(self.connect_timeout_seconds)
100    }
101
102    pub fn query_timeout(&self) -> Duration {
103        Duration::from_secs(self.query_timeout_seconds)
104    }
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct RetryConfig {
109    /// Max number of retry attempts
110    pub max_retries: u32,
111
112    /// Initial backoff duration in milliseconds
113    pub initial_backoff_ms: u64,
114
115    /// Max backoff duration in milliseconds
116    pub max_backoff_ms: u64,
117
118    /// Backoff multiplier for exponential backoff
119    pub backoff_multiplier: f64,
120}
121
122impl Default for RetryConfig {
123    fn default() -> Self {
124        Self {
125            max_retries: MAX_RETRIES_DEFAULT,
126            initial_backoff_ms: INITIAL_BACKOFF_MS_DEFAULT,
127            max_backoff_ms: MAX_BACKOFF_MS_DEFAULT,
128            backoff_multiplier: BACKOFF_MULTIPLIER_DEFAULT,
129        }
130    }
131}
132
133impl RetryConfig {
134    pub fn backoff_duration(&self, attempt: u32) -> Duration {
135        if attempt == 0 {
136            return Duration::from_millis(0);
137        }
138
139        let backoff_ms = (self.initial_backoff_ms as f64
140            * self.backoff_multiplier.powi(attempt as i32 - 1)) as u64;
141        let capped_backoff_ms = std::cmp::min(backoff_ms, self.max_backoff_ms);
142
143        Duration::from_millis(capped_backoff_ms)
144    }
145}
146
147#[derive(Debug, Clone)]
148pub struct DatalakeConfig {
149    pub clickhouse: Arc<ClickhouseConfig>,
150    pub retry: Arc<RetryConfig>,
151}
152
153impl Default for DatalakeConfig {
154    fn default() -> Self {
155        Self {
156            clickhouse: Arc::new(ClickhouseConfig::default()),
157            retry: Arc::new(RetryConfig::default()),
158        }
159    }
160}
161
162impl DatalakeConfig {
163    pub fn new(clickhouse_config: ClickhouseConfig, retry: RetryConfig) -> Self {
164        Self {
165            clickhouse: Arc::new(clickhouse_config),
166            retry: Arc::new(retry),
167        }
168    }
169}