clickhouse_connection_pool/
config.rs1use serde_derive::{Deserialize, Serialize};
2use std::{sync::Arc, time::Duration};
3use url::Url;
4
5pub const CONNECTION_TIMEOUT_SECONDS_DEFAULT: u64 = 3;
6pub const QUERY_TIMEOUT_SECONDS_DEFAULT: u64 = 10;
7pub const POOL_MAX_CONNECTIONS_DEFAULT: u32 = 256;
8pub const PORT_DEFAULT: u16 = 9000;
9
10pub const MAX_RETRIES_DEFAULT: u32 = 5;
11pub const INITIAL_BACKOFF_MS_DEFAULT: u64 = 100;
12pub const MAX_BACKOFF_MS_DEFAULT: u64 = 10_000;
13pub const BACKOFF_MULTIPLIER_DEFAULT: f64 = 2.0;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct ClickhouseConfig {
17 pub host: String,
19
20 pub port: u16,
22
23 pub database: String,
25
26 pub username: String,
28
29 pub password: String,
31
32 #[serde(default)]
34 pub connect_timeout_seconds: u64,
35
36 #[serde(default)]
38 pub query_timeout_seconds: u64,
39
40 #[serde(default)]
42 pub max_connections: u32,
43}
44
45impl Default for ClickhouseConfig {
46 fn default() -> Self {
47 Self {
48 host: "localhost".to_string(),
49 port: PORT_DEFAULT,
50 database: "default".to_string(),
51 username: "default".to_string(),
52 password: String::new(),
53 connect_timeout_seconds: CONNECTION_TIMEOUT_SECONDS_DEFAULT,
54 query_timeout_seconds: QUERY_TIMEOUT_SECONDS_DEFAULT,
55 max_connections: POOL_MAX_CONNECTIONS_DEFAULT,
56 }
57 }
58}
59
60impl ClickhouseConfig {
61 pub fn new(
62 host: String,
63 port: u16,
64 database: String,
65 username: String,
66 password: String,
67 connect_timeout_seconds: u64,
68 query_timeout_seconds: u64,
69 max_connections: u32,
70 ) -> Self {
71 Self {
72 host,
73 port,
74 database,
75 username,
76 password,
77 connect_timeout_seconds,
78 query_timeout_seconds,
79 max_connections,
80 }
81 }
82
83 pub fn connection_url(&self) -> Url {
84 let url_str = format!("https://{}:{}", self.host, self.port,);
85
86 Url::parse(&url_str).expect("Failed to parse Clickhouse URL")
87 }
88
89 pub fn authenticated_connection_url(&self) -> Url {
90 let url_str = format!(
91 "https://{}:{}@{}:{}",
92 self.username, self.password, self.host, self.port,
93 );
94
95 Url::parse(&url_str).expect("Failed to parse authenticated Clickhouse URL")
96 }
97
98 pub fn connect_timeout(&self) -> Duration {
99 Duration::from_secs(self.connect_timeout_seconds)
100 }
101
102 pub fn query_timeout(&self) -> Duration {
103 Duration::from_secs(self.query_timeout_seconds)
104 }
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
108pub struct RetryConfig {
109 pub max_retries: u32,
111
112 pub initial_backoff_ms: u64,
114
115 pub max_backoff_ms: u64,
117
118 pub backoff_multiplier: f64,
120}
121
122impl Default for RetryConfig {
123 fn default() -> Self {
124 Self {
125 max_retries: MAX_RETRIES_DEFAULT,
126 initial_backoff_ms: INITIAL_BACKOFF_MS_DEFAULT,
127 max_backoff_ms: MAX_BACKOFF_MS_DEFAULT,
128 backoff_multiplier: BACKOFF_MULTIPLIER_DEFAULT,
129 }
130 }
131}
132
133impl RetryConfig {
134 pub fn backoff_duration(&self, attempt: u32) -> Duration {
135 if attempt == 0 {
136 return Duration::from_millis(0);
137 }
138
139 let backoff_ms = (self.initial_backoff_ms as f64
140 * self.backoff_multiplier.powi(attempt as i32 - 1)) as u64;
141 let capped_backoff_ms = std::cmp::min(backoff_ms, self.max_backoff_ms);
142
143 Duration::from_millis(capped_backoff_ms)
144 }
145}
146
147#[derive(Debug, Clone)]
148pub struct DatalakeConfig {
149 pub clickhouse: Arc<ClickhouseConfig>,
150 pub retry: Arc<RetryConfig>,
151}
152
153impl Default for DatalakeConfig {
154 fn default() -> Self {
155 Self {
156 clickhouse: Arc::new(ClickhouseConfig::default()),
157 retry: Arc::new(RetryConfig::default()),
158 }
159 }
160}
161
162impl DatalakeConfig {
163 pub fn new(clickhouse_config: ClickhouseConfig, retry: RetryConfig) -> Self {
164 Self {
165 clickhouse: Arc::new(clickhouse_config),
166 retry: Arc::new(retry),
167 }
168 }
169}