use crate::{E2eError, Result};
use clickhouse::{Client, Row};
use serde::Deserialize;
use tracing::info;
pub struct ClickHouseResource {
client: Client,
admin_client: Client,
pub database: String,
url: String,
should_drop: bool,
}
impl ClickHouseResource {
pub async fn connect_existing(url: &str, database: &str) -> Result<Self> {
let client = Client::default().with_url(url).with_database(database);
let admin_client = Client::default().with_url(url).with_database("default");
info!("Connected to existing ClickHouse database: {}", database);
Ok(Self {
client,
admin_client,
database: database.to_string(),
url: url.to_string(),
should_drop: false, })
}
pub async fn new(url: &str, database: &str) -> Result<Self> {
let admin_client = Client::default().with_url(url).with_database("default");
admin_client
.query(&format!("CREATE DATABASE IF NOT EXISTS `{}`", database))
.execute()
.await
.map_err(|e| E2eError::ClickHouse(e.to_string()))?;
info!("Created ClickHouse database: {}", database);
let client = Client::default().with_url(url).with_database(database);
Ok(Self {
client,
admin_client,
database: database.to_string(),
url: url.to_string(),
should_drop: true, })
}
pub fn client(&self) -> &Client {
&self.client
}
pub async fn execute(&self, sql: &str) -> Result<()> {
self.client
.query(sql)
.execute()
.await
.map_err(|e| E2eError::ClickHouse(e.to_string()))?;
Ok(())
}
pub async fn count(&self, query: &str) -> Result<u64> {
let count: u64 = self
.client
.query(query)
.fetch_one()
.await
.map_err(|e| E2eError::ClickHouse(e.to_string()))?;
Ok(count)
}
pub async fn query<T>(&self, query: &str) -> Result<Vec<T>>
where
T: Row + for<'a> Deserialize<'a>,
{
let rows: Vec<T> = self
.client
.query(query)
.fetch_all()
.await
.map_err(|e| E2eError::ClickHouse(e.to_string()))?;
Ok(rows)
}
pub async fn query_one<T>(&self, query: &str) -> Result<T>
where
T: Row + for<'a> Deserialize<'a>,
{
let row: T = self
.client
.query(query)
.fetch_one()
.await
.map_err(|e| E2eError::ClickHouse(e.to_string()))?;
Ok(row)
}
pub async fn get_column_types(&self, table: &str) -> Result<Vec<(String, String)>> {
#[derive(Row, Deserialize)]
struct ColumnInfo {
name: String,
#[serde(rename = "type")]
col_type: String,
}
let query = format!(
"SELECT name, type FROM system.columns WHERE database = '{}' AND table = '{}' ORDER BY position",
self.database, table
);
let columns: Vec<ColumnInfo> = self
.client
.query(&query)
.fetch_all()
.await
.map_err(|e| E2eError::ClickHouse(e.to_string()))?;
Ok(columns.into_iter().map(|c| (c.name, c.col_type)).collect())
}
pub fn connection_url(&self) -> String {
format!("{}?database={}", self.url, self.database)
}
pub async fn list_tables(&self) -> Result<Vec<String>> {
#[derive(Row, Deserialize)]
struct TableName {
name: String,
}
let tables: Vec<TableName> = self
.query(&format!(
"SELECT name FROM system.tables WHERE database = '{}' ORDER BY name",
self.database
))
.await?;
Ok(tables.into_iter().map(|t| t.name).collect())
}
pub async fn get_sample_data_formatted(&self, table: &str, limit: usize) -> Result<String> {
let sample_query = format!(
"SELECT * FROM {}.{} LIMIT {} FORMAT PrettyCompact",
self.database, table, limit
);
let response = reqwest::Client::new()
.post(&self.url)
.query(&[("database", self.database.as_str())])
.body(sample_query)
.send()
.await
.map_err(|e| E2eError::ClickHouse(e.to_string()))?;
if response.status().is_success() {
let text = response
.text()
.await
.map_err(|e| E2eError::ClickHouse(e.to_string()))?;
Ok(text)
} else {
let status = response.status();
let error_text = response.text().await.unwrap_or_default();
Err(E2eError::ClickHouse(format!(
"HTTP {}: {}",
status, error_text
)))
}
}
}
impl Drop for ClickHouseResource {
fn drop(&mut self) {
if !self.should_drop {
return;
}
if let Ok(handle) = tokio::runtime::Handle::try_current() {
let database = self.database.clone();
let admin_client = self.admin_client.clone();
handle.spawn(async move {
let drop_sql = format!("DROP DATABASE IF EXISTS `{}`", database);
if let Err(e) = admin_client.query(&drop_sql).execute().await {
tracing::warn!("Failed to drop ClickHouse database {}: {}", database, e);
} else {
info!("Dropped ClickHouse database: {}", database);
}
});
}
}
}