Skip to main content

clicktype_transport/
client.rs

1//! Client implementation using clickhouse-rs (official HTTP client)
2
3use crate::error::{Error, Result};
4use crate::traits::ClickTypeTransport;
5use async_trait::async_trait;
6
7/// Normalize ClickHouse type names for comparison
8///
9/// Removes whitespace and converts to lowercase for robust comparison
10fn normalize_clickhouse_type(type_name: &str) -> String {
11    type_name
12        .chars()
13        .filter(|c| !c.is_whitespace())
14        .collect::<String>()
15        .to_lowercase()
16}
17
18/// Compression settings for the client
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
20pub enum Compression {
21    /// No compression
22    #[default]
23    None,
24    /// LZ4 compression
25    #[cfg(feature = "compression")]
26    Lz4,
27}
28
29#[cfg(feature = "clickhouse-backend")]
30use clickhouse::Client as ClickHouseClient;
31
32// ... (existing imports and code) ...
33
34#[async_trait]
35impl ClickTypeTransport for Client {
36    async fn insert_binary(&self, table_name: &str, data: &[u8]) -> Result<()> {
37        self.insert_binary(table_name, data).await
38    }
39
40    async fn validate_schema(&self, table_name: &str, expected_columns: &[(&str, String)]) -> Result<()> {
41        use tracing::{debug, warn};
42
43        debug!(
44            table = %table_name,
45            columns = expected_columns.len(),
46            "Validating table schema"
47        );
48
49        // Query actual schema from ClickHouse
50        let describe_query = format!("DESCRIBE TABLE {}", table_name);
51
52        #[derive(clickhouse::Row, serde::Deserialize)]
53        struct ColumnDescription {
54            name: String,
55            #[serde(rename = "type")]
56            type_name: String,
57            default_type: String,
58            #[serde(default)]
59            #[allow(dead_code)]
60            default_expression: String,
61            #[serde(default)]
62            #[allow(dead_code)]
63            comment: String,
64            #[serde(default)]
65            #[allow(dead_code)]
66            codec_expression: String,
67            #[serde(default)]
68            #[allow(dead_code)]
69            ttl_expression: String,
70        }
71
72        let actual_columns: Vec<ColumnDescription> = self.query(&describe_query).await?;
73
74        // Filter out MATERIALIZED and ALIAS columns (not insertable)
75        let insertable_actual: Vec<&ColumnDescription> = actual_columns
76            .iter()
77            .filter(|col| col.default_type != "MATERIALIZED" && col.default_type != "ALIAS")
78            .collect();
79
80        let mut errors = Vec::new();
81
82        // CRITICAL: Validate column ORDER (RowBinary is position-based!)
83        if expected_columns.len() != insertable_actual.len() {
84            errors.push(format!(
85                "Column count mismatch: struct has {} fields, table has {} insertable columns",
86                expected_columns.len(),
87                insertable_actual.len()
88            ));
89        } else {
90            // Check that each column matches in ORDER
91            for (idx, (expected_name, expected_type)) in expected_columns.iter().enumerate() {
92                let actual_col = &insertable_actual[idx];
93
94                // Check name matches
95                if *expected_name != actual_col.name {
96                    errors.push(format!(
97                        "Column order mismatch at position {}: struct has '{}', table has '{}'",
98                        idx, expected_name, actual_col.name
99                    ));
100                }
101
102                // Check type matches
103                let expected_normalized = normalize_clickhouse_type(expected_type);
104                let actual_normalized = normalize_clickhouse_type(&actual_col.type_name);
105
106                if expected_normalized != actual_normalized {
107                    errors.push(format!(
108                        "Column '{}' type mismatch: struct has {}, table has {}",
109                        expected_name, expected_type, actual_col.type_name
110                    ));
111                }
112            }
113        }
114
115        // Check for extra columns in table (warning only, not an error)
116        for actual_col in &actual_columns {
117            let col_exists = expected_columns
118                .iter()
119                .any(|(name, _)| *name == actual_col.name);
120
121            if !col_exists {
122                // Skip MATERIALIZED and ALIAS columns as they're not insertable
123                if actual_col.default_type != "MATERIALIZED" && actual_col.default_type != "ALIAS" {
124                    warn!(
125                        column = %actual_col.name,
126                        "Table has extra column not in struct (may cause issues if schema changes)"
127                    );
128                }
129            }
130        }
131
132        if !errors.is_empty() {
133            return Err(Error::SchemaValidation(format!(
134                "Schema validation failed for table '{}':\n{}",
135                table_name,
136                errors.join("\n")
137            )));
138        }
139
140        debug!(table = %table_name, "Schema validation passed");
141        Ok(())
142    }
143}
144
145/// Main ClickType client
146#[derive(Clone)]
147pub struct Client {
148    #[cfg(feature = "clickhouse-backend")]
149    inner: ClickHouseClient,
150    #[cfg(feature = "clickhouse-backend")]
151    http_client: reqwest::Client,
152    database: String,
153    url: String,
154    user: String,
155    password: String,
156    compression: Compression,
157}
158
159impl Client {
160    /// Create a new client builder
161    pub fn builder() -> ClientBuilder {
162        ClientBuilder::default()
163    }
164
165    /// Execute a DDL statement (CREATE TABLE, DROP TABLE, etc.)
166    #[cfg(feature = "clickhouse-backend")]
167    pub async fn execute(&self, sql: &str) -> Result<()> {
168        self.inner
169            .query(sql)
170            .execute()
171            .await
172            .map_err(|e| Error::Connection(format!("Execute error: {}", e)))?;
173        Ok(())
174    }
175
176    /// Execute a query and return typed results
177    ///
178    /// The type T must implement serde::Deserialize and clickhouse::Row
179    #[cfg(feature = "clickhouse-backend")]
180    pub async fn query<T>(&self, sql: &str) -> Result<Vec<T>>
181    where
182        T: serde::de::DeserializeOwned + clickhouse::Row,
183    {
184        let rows = self.inner
185            .query(sql)
186            .fetch_all::<T>()
187            .await
188            .map_err(|e| Error::Connection(format!("Query error: {}", e)))?;
189
190        Ok(rows)
191    }
192
193    /// Execute a query and count rows (for verification)
194    #[cfg(feature = "clickhouse-backend")]
195    pub async fn query_check(&self, sql: &str) -> Result<u64> {
196        // Wrap the query with COUNT(*) to get row count efficiently
197        let count_sql = format!("SELECT count() FROM ({})", sql);
198
199        #[derive(clickhouse::Row, serde::Deserialize)]
200        struct CountResult {
201            #[serde(rename = "count()")]
202            count: u64,
203        }
204
205        let result = self.inner
206            .query(&count_sql)
207            .fetch_one::<CountResult>()
208            .await
209            .map_err(|e| Error::Connection(format!("Query error: {}", e)))?;
210
211        Ok(result.count)
212    }
213
214    /// Insert rows using ClickHouse HTTP insert
215    ///
216    /// The type T must implement serde::Serialize and clickhouse::Row
217    #[cfg(feature = "clickhouse-backend")]
218    pub async fn insert<T>(&self, table_name: &str, rows: &[T]) -> Result<()> 
219    where
220        T: serde::Serialize + clickhouse::Row,
221    {
222        if rows.is_empty() {
223            return Ok(());
224        }
225
226        let mut insert = self.inner
227            .insert(table_name)
228            .map_err(|e| Error::Connection(format!("Insert setup error: {}", e)))?;
229
230        for row in rows {
231            insert
232                .write(row)
233                .await
234                .map_err(|e| Error::Serialization(format!("Insert write error: {}", e)))?;
235        }
236
237        insert
238            .end()
239            .await
240            .map_err(|e| Error::Connection(format!("Insert end error: {}", e)))?;
241
242        Ok(())
243    }
244
245    /// Insert raw RowBinary data directly to ClickHouse
246    ///
247    /// This is the low-level method for high-performance batching.
248    /// The data must be pre-serialized in RowBinary format.
249    #[cfg(feature = "clickhouse-backend")]
250    pub async fn insert_binary(&self, table_name: &str, data: &[u8]) -> Result<()> {
251        if data.is_empty() {
252            return Ok(());
253        }
254
255        let query = format!("INSERT INTO {} FORMAT RowBinary", table_name);
256        let url = format!("{}/?query={}", self.url, urlencoding::encode(&query));
257
258        let mut request_builder = self.http_client
259            .post(&url)
260            .basic_auth(&self.user, Some(&self.password))
261            .header("Content-Type", "application/octet-stream");
262
263        let body = match self.compression {
264            Compression::None => data.to_vec(),
265            #[cfg(feature = "compression")]
266            Compression::Lz4 => {
267                request_builder = request_builder.header("Content-Encoding", "lz4");
268                lz4_flex::compress_prepend_size(data)
269            }
270        };
271
272        let response = request_builder
273            .body(body)
274            .send()
275            .await
276            .map_err(|e| Error::Connection(format!("HTTP request failed: {}", e)))?;
277
278        if !response.status().is_success() {
279            let status = response.status();
280            let body = response.text().await.unwrap_or_default();
281            return Err(Error::Connection(format!(
282                "Insert failed with status {}: {}",
283                status, body
284            )));
285        }
286
287        Ok(())
288    }
289
290    /// Validate that the table schema matches the struct definition
291    ///
292    /// This is critical for RowBinary inserts to prevent silent data corruption.
293    /// Call this before starting batch inserts to ensure schema compatibility.
294    #[cfg(feature = "clickhouse-backend")]
295    pub async fn validate_schema<T>(&self, table_name: &str) -> Result<()> 
296    where
297        T: clicktype_core::traits::ClickTable,
298    {
299        // Delegate to the trait implementation
300        let schema = T::schema();
301        // Convert schema to slice of references if needed, T::schema() returns Vec<(&str, &str)> 
302        // Expected signature: &[(&str, &str)]
303        <Self as ClickTypeTransport>::validate_schema(self, table_name, &schema).await
304    }
305
306    /// Get the database name
307    pub fn database(&self) -> &str {
308        &self.database
309    }
310
311    #[cfg(not(feature = "clickhouse-backend"))]
312    pub async fn execute(&self, _sql: &str) -> Result<()> {
313        Err(Error::Connection("No backend enabled".to_string()))
314    }
315
316    #[cfg(not(feature = "clickhouse-backend"))]
317    pub async fn query<T>(&self, _sql: &str) -> Result<Vec<T>> {
318        Err(Error::Connection("No backend enabled".to_string()))
319    }
320
321    #[cfg(not(feature = "clickhouse-backend"))]
322    pub async fn query_check(&self, _sql: &str) -> Result<u64> {
323        Err(Error::Connection("No backend enabled".to_string()))
324    }
325
326    #[cfg(not(feature = "clickhouse-backend"))]
327    pub async fn insert<T>(&self, _table_name: &str, _rows: &[T]) -> Result<()> {
328        Err(Error::Connection("No backend enabled".to_string()))
329    }
330}
331
332/// Client configuration builder
333#[derive(Default, Clone)]
334pub struct ClientBuilder {
335    host: Option<String>,
336    port: Option<u16>,
337    database: Option<String>,
338    user: Option<String>,
339    password: Option<String>,
340    compression: Compression,
341}
342
343impl ClientBuilder {
344    /// Set the ClickHouse host
345    pub fn host(mut self, host: impl Into<String>) -> Self {
346        self.host = Some(host.into());
347        self
348    }
349
350    /// Set the ClickHouse port (default: 8123 for HTTP)
351    pub fn port(mut self, port: u16) -> Self {
352        self.port = Some(port);
353        self
354    }
355
356    /// Set the database name
357    pub fn database(mut self, database: impl Into<String>) -> Self {
358        self.database = Some(database.into());
359        self
360    }
361
362    /// Set the username
363    pub fn user(mut self, user: impl Into<String>) -> Self {
364        self.user = Some(user.into());
365        self
366    }
367
368    /// Set the password
369    pub fn password(mut self, password: impl Into<String>) -> Self {
370        self.password = Some(password.into());
371        self
372    }
373
374    /// Set the compression method
375    pub fn compression(mut self, compression: Compression) -> Self {
376        self.compression = compression;
377        self
378    }
379
380    /// Build the client
381    #[cfg(feature = "clickhouse-backend")]
382    pub async fn build(self) -> Result<Client> {
383        let host = self.host.unwrap_or_else(|| "localhost".to_string());
384        let port = self.port.unwrap_or(8123); // HTTP port by default
385        let database = self.database.unwrap_or_else(|| "default".to_string());
386        let user = self.user.unwrap_or_else(|| "default".to_string());
387        let password = self.password.unwrap_or_else(|| String::new());
388
389        let url = format!("http://{}:{}", host, port);
390
391        let client = ClickHouseClient::default()
392            .with_url(&url)
393            .with_user(&user)
394            .with_password(&password)
395            .with_database(&database);
396
397        let http_client = reqwest::Client::builder()
398            .timeout(std::time::Duration::from_secs(30))
399            .pool_idle_timeout(std::time::Duration::from_secs(90))
400            .pool_max_idle_per_host(32)
401            .build()
402            .map_err(|e| Error::Connection(format!("Failed to build HTTP client: {}", e)))?;
403
404        Ok(Client {
405            inner: client,
406            http_client,
407            database,
408            url,
409            user,
410            password,
411            compression: self.compression,
412        })
413    }
414
415    /// Build the client (stub for when no backend is enabled)
416    #[cfg(not(feature = "clickhouse-backend"))]
417    pub async fn build(self) -> Result<Client> {
418        Err(Error::Connection("No backend enabled".to_string()))
419    }
420}