use std::time::Duration;
use crate::{
config::ClickHouseConfig,
error::{Error, Result},
};
pub(crate) async fn create_client(config: &ClickHouseConfig) -> Result<clickhouse::Client> {
create_client_with_retries(config, config.max_retries).await
}
async fn create_client_with_retries(
config: &ClickHouseConfig,
max_retries: u32,
) -> Result<clickhouse::Client> {
let mut attempt = 0;
let base_delay = Duration::from_secs(config.retry_delay_secs);
loop {
match try_create_client(config).await {
Ok(client) => {
if attempt > 0 {
tracing::info!(
"ClickHouse connection verified after {} attempt(s)",
attempt + 1
);
} else {
tracing::info!("ClickHouse client connected to {}", config.url);
}
return Ok(client);
}
Err(e) => {
attempt += 1;
if attempt > max_retries {
tracing::error!(
"Failed to connect to ClickHouse after {} attempts: {}",
max_retries + 1,
e
);
return Err(e);
}
let delay_multiplier = 2_u32.pow(attempt.saturating_sub(1));
let delay = base_delay * delay_multiplier;
tracing::warn!(
"ClickHouse connection attempt {} failed: {}. Retrying in {:?}...",
attempt,
e,
delay
);
tokio::time::sleep(delay).await;
}
}
}
}
async fn try_create_client(config: &ClickHouseConfig) -> Result<clickhouse::Client> {
let mut client = clickhouse::Client::default()
.with_url(&config.url)
.with_database(&config.database);
if let Some(ref user) = config.username {
client = client.with_user(user);
}
if let Some(ref pass) = config.password {
client = client.with_password(pass);
}
client
.query("SELECT 1")
.execute()
.await
.map_err(|e| {
Error::ClickHouse(format!(
"Failed to connect to ClickHouse at '{}'\n\n\
Troubleshooting:\n\
1. Verify ClickHouse server is running\n\
2. Check HTTP interface is enabled (default port 8123)\n\
3. Verify credentials and database name\n\
4. Check network connectivity\n\n\
Error: {}",
config.url, e
))
})?;
Ok(client)
}
pub(crate) fn sanitize_url(url: &str) -> String {
if let Some(at_pos) = url.find('@') {
if let Some(scheme_end) = url.find("://") {
return format!("{}://***@{}", &url[..scheme_end], &url[at_pos + 1..]);
}
}
url.to_string()
}
#[async_trait::async_trait]
pub trait AnalyticsWriter<T>: Send + Sync
where
T: clickhouse::Row + clickhouse::RowOwned + clickhouse::RowWrite + serde::Serialize + Send + Sync,
{
fn client(&self) -> &clickhouse::Client;
fn table_name(&self) -> &str;
async fn write_one(&self, row: T) -> Result<()> {
let mut insert: clickhouse::insert::Insert<T> = self
.client()
.insert(self.table_name())
.await
.map_err(|e| Error::ClickHouse(format!("Failed to create insert: {}", e)))?;
insert
.write(&row)
.await
.map_err(|e| Error::ClickHouse(format!("Failed to write row: {}", e)))?;
insert
.end()
.await
.map_err(|e| Error::ClickHouse(format!("Failed to flush insert: {}", e)))?;
Ok(())
}
async fn write_batch(&self, rows: Vec<T>) -> Result<()> {
if rows.is_empty() {
return Ok(());
}
let mut insert: clickhouse::insert::Insert<T> = self
.client()
.insert(self.table_name())
.await
.map_err(|e| Error::ClickHouse(format!("Failed to create insert: {}", e)))?;
for row in &rows {
insert
.write(row)
.await
.map_err(|e| Error::ClickHouse(format!("Failed to write row: {}", e)))?;
}
insert
.end()
.await
.map_err(|e| Error::ClickHouse(format!("Failed to flush batch: {}", e)))?;
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_config_deserialize_applies_all_defaults() {
let json = r#"{"url": "http://ch:8123"}"#;
let config: ClickHouseConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.database, "default");
assert!(config.username.is_none());
assert!(config.password.is_none());
assert_eq!(config.max_retries, 5);
assert_eq!(config.retry_delay_secs, 2);
assert!(!config.optional);
assert!(config.lazy_init);
}
#[test]
fn test_config_serde_roundtrip_preserves_all_fields() {
let config = ClickHouseConfig {
url: "https://ch.prod:8443".to_string(),
database: "analytics".to_string(),
username: Some("admin".to_string()),
password: Some("s3cret".to_string()),
max_retries: 10,
retry_delay_secs: 30,
optional: true,
lazy_init: false,
};
let json = serde_json::to_string(&config).unwrap();
let roundtripped: ClickHouseConfig = serde_json::from_str(&json).unwrap();
assert_eq!(roundtripped.url, config.url);
assert_eq!(roundtripped.database, config.database);
assert_eq!(roundtripped.username, config.username);
assert_eq!(roundtripped.password, config.password);
assert_eq!(roundtripped.max_retries, config.max_retries);
assert_eq!(roundtripped.retry_delay_secs, config.retry_delay_secs);
assert_eq!(roundtripped.optional, config.optional);
assert_eq!(roundtripped.lazy_init, config.lazy_init);
}
#[test]
fn test_config_overrides_beat_defaults() {
let json = r#"{
"url": "http://ch:8123",
"database": "events",
"max_retries": 0,
"retry_delay_secs": 60,
"optional": true,
"lazy_init": false
}"#;
let config: ClickHouseConfig = serde_json::from_str(json).unwrap();
assert_eq!(config.database, "events");
assert_eq!(config.max_retries, 0);
assert_eq!(config.retry_delay_secs, 60);
assert!(config.optional);
assert!(!config.lazy_init);
}
#[test]
fn test_sanitize_url_redacts_http_credentials() {
let sanitized = sanitize_url("http://user:pass@localhost:8123");
assert_eq!(sanitized, "http://***@localhost:8123");
}
#[test]
fn test_sanitize_url_redacts_https_credentials() {
let sanitized = sanitize_url("https://admin:s3cret@ch.example.com:8443/db");
assert!(!sanitized.contains("admin"));
assert!(!sanitized.contains("s3cret"));
assert!(sanitized.contains("ch.example.com:8443/db"));
}
#[test]
fn test_sanitize_url_passthrough_when_no_credentials() {
let url = "http://localhost:8123";
assert_eq!(sanitize_url(url), url);
}
#[test]
fn test_sanitize_url_no_scheme_leaves_credentials_visible() {
let url = "user:pass@localhost:8123";
assert_eq!(sanitize_url(url), url);
}
#[test]
fn test_sanitize_url_handles_empty_string() {
assert_eq!(sanitize_url(""), "");
}
#[test]
fn test_clickhouse_error_maps_to_500_with_analytics_code() {
use axum::response::IntoResponse;
let err = Error::ClickHouse("connection refused".to_string());
let response = err.into_response();
assert_eq!(
response.status(),
axum::http::StatusCode::INTERNAL_SERVER_ERROR,
"ClickHouse errors should be 500, not exposed as client errors"
);
}
#[tokio::test]
async fn test_clickhouse_error_response_body_contains_analytics_code() {
use axum::response::IntoResponse;
let err = Error::ClickHouse("query failed".to_string());
let response = err.into_response();
let body = axum::body::to_bytes(response.into_body(), usize::MAX)
.await
.unwrap();
let json: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(json["code"], "ANALYTICS_ERROR");
assert!(
!json["error"].as_str().unwrap().contains("query failed"),
"Internal error details should not be exposed in response body"
);
}
#[test]
fn test_clickhouse_error_does_not_leak_internal_message() {
let err = Error::ClickHouse(
"DB::Exception: Table default.audit_events doesn't exist".to_string(),
);
let display = err.to_string();
assert!(display.contains("doesn't exist"));
}
}