use crate::error::{Error, Result};
use crate::traits::ClickTypeTransport;
use async_trait::async_trait;
fn normalize_clickhouse_type(type_name: &str) -> String {
type_name
.chars()
.filter(|c| !c.is_whitespace())
.collect::<String>()
.to_lowercase()
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum Compression {
#[default]
None,
#[cfg(feature = "compression")]
Lz4,
}
#[cfg(feature = "clickhouse-backend")]
use clickhouse::Client as ClickHouseClient;
#[async_trait]
impl ClickTypeTransport for Client {
async fn insert_binary(&self, table_name: &str, data: &[u8]) -> Result<()> {
self.insert_binary(table_name, data).await
}
async fn validate_schema(&self, table_name: &str, expected_columns: &[(&str, String)]) -> Result<()> {
use tracing::{debug, warn};
debug!(
table = %table_name,
columns = expected_columns.len(),
"Validating table schema"
);
let describe_query = format!("DESCRIBE TABLE {}", table_name);
#[derive(clickhouse::Row, serde::Deserialize)]
struct ColumnDescription {
name: String,
#[serde(rename = "type")]
type_name: String,
default_type: String,
#[serde(default)]
#[allow(dead_code)]
default_expression: String,
#[serde(default)]
#[allow(dead_code)]
comment: String,
#[serde(default)]
#[allow(dead_code)]
codec_expression: String,
#[serde(default)]
#[allow(dead_code)]
ttl_expression: String,
}
let actual_columns: Vec<ColumnDescription> = self.query(&describe_query).await?;
let insertable_actual: Vec<&ColumnDescription> = actual_columns
.iter()
.filter(|col| col.default_type != "MATERIALIZED" && col.default_type != "ALIAS")
.collect();
let mut errors = Vec::new();
if expected_columns.len() != insertable_actual.len() {
errors.push(format!(
"Column count mismatch: struct has {} fields, table has {} insertable columns",
expected_columns.len(),
insertable_actual.len()
));
} else {
for (idx, (expected_name, expected_type)) in expected_columns.iter().enumerate() {
let actual_col = &insertable_actual[idx];
if *expected_name != actual_col.name {
errors.push(format!(
"Column order mismatch at position {}: struct has '{}', table has '{}'",
idx, expected_name, actual_col.name
));
}
let expected_normalized = normalize_clickhouse_type(expected_type);
let actual_normalized = normalize_clickhouse_type(&actual_col.type_name);
if expected_normalized != actual_normalized {
errors.push(format!(
"Column '{}' type mismatch: struct has {}, table has {}",
expected_name, expected_type, actual_col.type_name
));
}
}
}
for actual_col in &actual_columns {
let col_exists = expected_columns
.iter()
.any(|(name, _)| *name == actual_col.name);
if !col_exists {
if actual_col.default_type != "MATERIALIZED" && actual_col.default_type != "ALIAS" {
warn!(
column = %actual_col.name,
"Table has extra column not in struct (may cause issues if schema changes)"
);
}
}
}
if !errors.is_empty() {
return Err(Error::SchemaValidation(format!(
"Schema validation failed for table '{}':\n{}",
table_name,
errors.join("\n")
)));
}
debug!(table = %table_name, "Schema validation passed");
Ok(())
}
}
#[derive(Clone)]
pub struct Client {
#[cfg(feature = "clickhouse-backend")]
inner: ClickHouseClient,
#[cfg(feature = "clickhouse-backend")]
http_client: reqwest::Client,
database: String,
url: String,
user: String,
password: String,
compression: Compression,
}
impl Client {
pub fn builder() -> ClientBuilder {
ClientBuilder::default()
}
#[cfg(feature = "clickhouse-backend")]
pub async fn execute(&self, sql: &str) -> Result<()> {
self.inner
.query(sql)
.execute()
.await
.map_err(|e| Error::Connection(format!("Execute error: {}", e)))?;
Ok(())
}
#[cfg(feature = "clickhouse-backend")]
pub async fn query<T>(&self, sql: &str) -> Result<Vec<T>>
where
T: serde::de::DeserializeOwned + clickhouse::Row,
{
let rows = self.inner
.query(sql)
.fetch_all::<T>()
.await
.map_err(|e| Error::Connection(format!("Query error: {}", e)))?;
Ok(rows)
}
#[cfg(feature = "clickhouse-backend")]
pub async fn query_check(&self, sql: &str) -> Result<u64> {
let count_sql = format!("SELECT count() FROM ({})", sql);
#[derive(clickhouse::Row, serde::Deserialize)]
struct CountResult {
#[serde(rename = "count()")]
count: u64,
}
let result = self.inner
.query(&count_sql)
.fetch_one::<CountResult>()
.await
.map_err(|e| Error::Connection(format!("Query error: {}", e)))?;
Ok(result.count)
}
#[cfg(feature = "clickhouse-backend")]
pub async fn insert<T>(&self, table_name: &str, rows: &[T]) -> Result<()>
where
T: serde::Serialize + clickhouse::Row,
{
if rows.is_empty() {
return Ok(());
}
let mut insert = self.inner
.insert(table_name)
.map_err(|e| Error::Connection(format!("Insert setup error: {}", e)))?;
for row in rows {
insert
.write(row)
.await
.map_err(|e| Error::Serialization(format!("Insert write error: {}", e)))?;
}
insert
.end()
.await
.map_err(|e| Error::Connection(format!("Insert end error: {}", e)))?;
Ok(())
}
#[cfg(feature = "clickhouse-backend")]
pub async fn insert_binary(&self, table_name: &str, data: &[u8]) -> Result<()> {
if data.is_empty() {
return Ok(());
}
let query = format!("INSERT INTO {} FORMAT RowBinary", table_name);
let url = format!("{}/?query={}", self.url, urlencoding::encode(&query));
let mut request_builder = self.http_client
.post(&url)
.basic_auth(&self.user, Some(&self.password))
.header("Content-Type", "application/octet-stream");
let body = match self.compression {
Compression::None => data.to_vec(),
#[cfg(feature = "compression")]
Compression::Lz4 => {
request_builder = request_builder.header("Content-Encoding", "lz4");
lz4_flex::compress_prepend_size(data)
}
};
let response = request_builder
.body(body)
.send()
.await
.map_err(|e| Error::Connection(format!("HTTP request failed: {}", e)))?;
if !response.status().is_success() {
let status = response.status();
let body = response.text().await.unwrap_or_default();
return Err(Error::Connection(format!(
"Insert failed with status {}: {}",
status, body
)));
}
Ok(())
}
#[cfg(feature = "clickhouse-backend")]
pub async fn validate_schema<T>(&self, table_name: &str) -> Result<()>
where
T: clicktype_core::traits::ClickTable,
{
let schema = T::schema();
<Self as ClickTypeTransport>::validate_schema(self, table_name, &schema).await
}
pub fn database(&self) -> &str {
&self.database
}
#[cfg(not(feature = "clickhouse-backend"))]
pub async fn execute(&self, _sql: &str) -> Result<()> {
Err(Error::Connection("No backend enabled".to_string()))
}
#[cfg(not(feature = "clickhouse-backend"))]
pub async fn query<T>(&self, _sql: &str) -> Result<Vec<T>> {
Err(Error::Connection("No backend enabled".to_string()))
}
#[cfg(not(feature = "clickhouse-backend"))]
pub async fn query_check(&self, _sql: &str) -> Result<u64> {
Err(Error::Connection("No backend enabled".to_string()))
}
#[cfg(not(feature = "clickhouse-backend"))]
pub async fn insert<T>(&self, _table_name: &str, _rows: &[T]) -> Result<()> {
Err(Error::Connection("No backend enabled".to_string()))
}
}
#[derive(Default, Clone)]
pub struct ClientBuilder {
host: Option<String>,
port: Option<u16>,
database: Option<String>,
user: Option<String>,
password: Option<String>,
compression: Compression,
}
impl ClientBuilder {
pub fn host(mut self, host: impl Into<String>) -> Self {
self.host = Some(host.into());
self
}
pub fn port(mut self, port: u16) -> Self {
self.port = Some(port);
self
}
pub fn database(mut self, database: impl Into<String>) -> Self {
self.database = Some(database.into());
self
}
pub fn user(mut self, user: impl Into<String>) -> Self {
self.user = Some(user.into());
self
}
pub fn password(mut self, password: impl Into<String>) -> Self {
self.password = Some(password.into());
self
}
pub fn compression(mut self, compression: Compression) -> Self {
self.compression = compression;
self
}
#[cfg(feature = "clickhouse-backend")]
pub async fn build(self) -> Result<Client> {
let host = self.host.unwrap_or_else(|| "localhost".to_string());
let port = self.port.unwrap_or(8123); let database = self.database.unwrap_or_else(|| "default".to_string());
let user = self.user.unwrap_or_else(|| "default".to_string());
let password = self.password.unwrap_or_else(|| String::new());
let url = format!("http://{}:{}", host, port);
let client = ClickHouseClient::default()
.with_url(&url)
.with_user(&user)
.with_password(&password)
.with_database(&database);
let http_client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(30))
.pool_idle_timeout(std::time::Duration::from_secs(90))
.pool_max_idle_per_host(32)
.build()
.map_err(|e| Error::Connection(format!("Failed to build HTTP client: {}", e)))?;
Ok(Client {
inner: client,
http_client,
database,
url,
user,
password,
compression: self.compression,
})
}
#[cfg(not(feature = "clickhouse-backend"))]
pub async fn build(self) -> Result<Client> {
Err(Error::Connection("No backend enabled".to_string()))
}
}