1use crate::error::{Error, Result};
4use crate::traits::ClickTypeTransport;
5use async_trait::async_trait;
6
7fn normalize_clickhouse_type(type_name: &str) -> String {
11 type_name
12 .chars()
13 .filter(|c| !c.is_whitespace())
14 .collect::<String>()
15 .to_lowercase()
16}
17
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
20pub enum Compression {
21 #[default]
23 None,
24 #[cfg(feature = "compression")]
26 Lz4,
27}
28
29#[cfg(feature = "clickhouse-backend")]
30use clickhouse::Client as ClickHouseClient;
31
32#[async_trait]
35impl ClickTypeTransport for Client {
36 async fn insert_binary(&self, table_name: &str, data: &[u8]) -> Result<()> {
37 self.insert_binary(table_name, data).await
38 }
39
40 async fn validate_schema(&self, table_name: &str, expected_columns: &[(&str, String)]) -> Result<()> {
41 use tracing::{debug, warn};
42
43 debug!(
44 table = %table_name,
45 columns = expected_columns.len(),
46 "Validating table schema"
47 );
48
49 let describe_query = format!("DESCRIBE TABLE {}", table_name);
51
52 #[derive(clickhouse::Row, serde::Deserialize)]
53 struct ColumnDescription {
54 name: String,
55 #[serde(rename = "type")]
56 type_name: String,
57 default_type: String,
58 #[serde(default)]
59 #[allow(dead_code)]
60 default_expression: String,
61 #[serde(default)]
62 #[allow(dead_code)]
63 comment: String,
64 #[serde(default)]
65 #[allow(dead_code)]
66 codec_expression: String,
67 #[serde(default)]
68 #[allow(dead_code)]
69 ttl_expression: String,
70 }
71
72 let actual_columns: Vec<ColumnDescription> = self.query(&describe_query).await?;
73
74 let insertable_actual: Vec<&ColumnDescription> = actual_columns
76 .iter()
77 .filter(|col| col.default_type != "MATERIALIZED" && col.default_type != "ALIAS")
78 .collect();
79
80 let mut errors = Vec::new();
81
82 if expected_columns.len() != insertable_actual.len() {
84 errors.push(format!(
85 "Column count mismatch: struct has {} fields, table has {} insertable columns",
86 expected_columns.len(),
87 insertable_actual.len()
88 ));
89 } else {
90 for (idx, (expected_name, expected_type)) in expected_columns.iter().enumerate() {
92 let actual_col = &insertable_actual[idx];
93
94 if *expected_name != actual_col.name {
96 errors.push(format!(
97 "Column order mismatch at position {}: struct has '{}', table has '{}'",
98 idx, expected_name, actual_col.name
99 ));
100 }
101
102 let expected_normalized = normalize_clickhouse_type(expected_type);
104 let actual_normalized = normalize_clickhouse_type(&actual_col.type_name);
105
106 if expected_normalized != actual_normalized {
107 errors.push(format!(
108 "Column '{}' type mismatch: struct has {}, table has {}",
109 expected_name, expected_type, actual_col.type_name
110 ));
111 }
112 }
113 }
114
115 for actual_col in &actual_columns {
117 let col_exists = expected_columns
118 .iter()
119 .any(|(name, _)| *name == actual_col.name);
120
121 if !col_exists {
122 if actual_col.default_type != "MATERIALIZED" && actual_col.default_type != "ALIAS" {
124 warn!(
125 column = %actual_col.name,
126 "Table has extra column not in struct (may cause issues if schema changes)"
127 );
128 }
129 }
130 }
131
132 if !errors.is_empty() {
133 return Err(Error::SchemaValidation(format!(
134 "Schema validation failed for table '{}':\n{}",
135 table_name,
136 errors.join("\n")
137 )));
138 }
139
140 debug!(table = %table_name, "Schema validation passed");
141 Ok(())
142 }
143}
144
145#[derive(Clone)]
147pub struct Client {
148 #[cfg(feature = "clickhouse-backend")]
149 inner: ClickHouseClient,
150 #[cfg(feature = "clickhouse-backend")]
151 http_client: reqwest::Client,
152 database: String,
153 url: String,
154 user: String,
155 password: String,
156 compression: Compression,
157}
158
159impl Client {
160 pub fn builder() -> ClientBuilder {
162 ClientBuilder::default()
163 }
164
165 #[cfg(feature = "clickhouse-backend")]
167 pub async fn execute(&self, sql: &str) -> Result<()> {
168 self.inner
169 .query(sql)
170 .execute()
171 .await
172 .map_err(|e| Error::Connection(format!("Execute error: {}", e)))?;
173 Ok(())
174 }
175
176 #[cfg(feature = "clickhouse-backend")]
180 pub async fn query<T>(&self, sql: &str) -> Result<Vec<T>>
181 where
182 T: serde::de::DeserializeOwned + clickhouse::Row,
183 {
184 let rows = self.inner
185 .query(sql)
186 .fetch_all::<T>()
187 .await
188 .map_err(|e| Error::Connection(format!("Query error: {}", e)))?;
189
190 Ok(rows)
191 }
192
193 #[cfg(feature = "clickhouse-backend")]
195 pub async fn query_check(&self, sql: &str) -> Result<u64> {
196 let count_sql = format!("SELECT count() FROM ({})", sql);
198
199 #[derive(clickhouse::Row, serde::Deserialize)]
200 struct CountResult {
201 #[serde(rename = "count()")]
202 count: u64,
203 }
204
205 let result = self.inner
206 .query(&count_sql)
207 .fetch_one::<CountResult>()
208 .await
209 .map_err(|e| Error::Connection(format!("Query error: {}", e)))?;
210
211 Ok(result.count)
212 }
213
214 #[cfg(feature = "clickhouse-backend")]
218 pub async fn insert<T>(&self, table_name: &str, rows: &[T]) -> Result<()>
219 where
220 T: serde::Serialize + clickhouse::Row,
221 {
222 if rows.is_empty() {
223 return Ok(());
224 }
225
226 let mut insert = self.inner
227 .insert(table_name)
228 .map_err(|e| Error::Connection(format!("Insert setup error: {}", e)))?;
229
230 for row in rows {
231 insert
232 .write(row)
233 .await
234 .map_err(|e| Error::Serialization(format!("Insert write error: {}", e)))?;
235 }
236
237 insert
238 .end()
239 .await
240 .map_err(|e| Error::Connection(format!("Insert end error: {}", e)))?;
241
242 Ok(())
243 }
244
245 #[cfg(feature = "clickhouse-backend")]
250 pub async fn insert_binary(&self, table_name: &str, data: &[u8]) -> Result<()> {
251 if data.is_empty() {
252 return Ok(());
253 }
254
255 let query = format!("INSERT INTO {} FORMAT RowBinary", table_name);
256 let url = format!("{}/?query={}", self.url, urlencoding::encode(&query));
257
258 let mut request_builder = self.http_client
259 .post(&url)
260 .basic_auth(&self.user, Some(&self.password))
261 .header("Content-Type", "application/octet-stream");
262
263 let body = match self.compression {
264 Compression::None => data.to_vec(),
265 #[cfg(feature = "compression")]
266 Compression::Lz4 => {
267 request_builder = request_builder.header("Content-Encoding", "lz4");
268 lz4_flex::compress_prepend_size(data)
269 }
270 };
271
272 let response = request_builder
273 .body(body)
274 .send()
275 .await
276 .map_err(|e| Error::Connection(format!("HTTP request failed: {}", e)))?;
277
278 if !response.status().is_success() {
279 let status = response.status();
280 let body = response.text().await.unwrap_or_default();
281 return Err(Error::Connection(format!(
282 "Insert failed with status {}: {}",
283 status, body
284 )));
285 }
286
287 Ok(())
288 }
289
290 #[cfg(feature = "clickhouse-backend")]
295 pub async fn validate_schema<T>(&self, table_name: &str) -> Result<()>
296 where
297 T: clicktype_core::traits::ClickTable,
298 {
299 let schema = T::schema();
301 <Self as ClickTypeTransport>::validate_schema(self, table_name, &schema).await
304 }
305
306 pub fn database(&self) -> &str {
308 &self.database
309 }
310
311 #[cfg(not(feature = "clickhouse-backend"))]
312 pub async fn execute(&self, _sql: &str) -> Result<()> {
313 Err(Error::Connection("No backend enabled".to_string()))
314 }
315
316 #[cfg(not(feature = "clickhouse-backend"))]
317 pub async fn query<T>(&self, _sql: &str) -> Result<Vec<T>> {
318 Err(Error::Connection("No backend enabled".to_string()))
319 }
320
321 #[cfg(not(feature = "clickhouse-backend"))]
322 pub async fn query_check(&self, _sql: &str) -> Result<u64> {
323 Err(Error::Connection("No backend enabled".to_string()))
324 }
325
326 #[cfg(not(feature = "clickhouse-backend"))]
327 pub async fn insert<T>(&self, _table_name: &str, _rows: &[T]) -> Result<()> {
328 Err(Error::Connection("No backend enabled".to_string()))
329 }
330}
331
332#[derive(Default, Clone)]
334pub struct ClientBuilder {
335 host: Option<String>,
336 port: Option<u16>,
337 database: Option<String>,
338 user: Option<String>,
339 password: Option<String>,
340 compression: Compression,
341}
342
343impl ClientBuilder {
344 pub fn host(mut self, host: impl Into<String>) -> Self {
346 self.host = Some(host.into());
347 self
348 }
349
350 pub fn port(mut self, port: u16) -> Self {
352 self.port = Some(port);
353 self
354 }
355
356 pub fn database(mut self, database: impl Into<String>) -> Self {
358 self.database = Some(database.into());
359 self
360 }
361
362 pub fn user(mut self, user: impl Into<String>) -> Self {
364 self.user = Some(user.into());
365 self
366 }
367
368 pub fn password(mut self, password: impl Into<String>) -> Self {
370 self.password = Some(password.into());
371 self
372 }
373
374 pub fn compression(mut self, compression: Compression) -> Self {
376 self.compression = compression;
377 self
378 }
379
380 #[cfg(feature = "clickhouse-backend")]
382 pub async fn build(self) -> Result<Client> {
383 let host = self.host.unwrap_or_else(|| "localhost".to_string());
384 let port = self.port.unwrap_or(8123); let database = self.database.unwrap_or_else(|| "default".to_string());
386 let user = self.user.unwrap_or_else(|| "default".to_string());
387 let password = self.password.unwrap_or_else(|| String::new());
388
389 let url = format!("http://{}:{}", host, port);
390
391 let client = ClickHouseClient::default()
392 .with_url(&url)
393 .with_user(&user)
394 .with_password(&password)
395 .with_database(&database);
396
397 let http_client = reqwest::Client::builder()
398 .timeout(std::time::Duration::from_secs(30))
399 .pool_idle_timeout(std::time::Duration::from_secs(90))
400 .pool_max_idle_per_host(32)
401 .build()
402 .map_err(|e| Error::Connection(format!("Failed to build HTTP client: {}", e)))?;
403
404 Ok(Client {
405 inner: client,
406 http_client,
407 database,
408 url,
409 user,
410 password,
411 compression: self.compression,
412 })
413 }
414
415 #[cfg(not(feature = "clickhouse-backend"))]
417 pub async fn build(self) -> Result<Client> {
418 Err(Error::Connection("No backend enabled".to_string()))
419 }
420}