pub mod otel_standard;
use std::time::Duration;
use fast_telemetry::otlp::pb;
use klickhouse::{Client, ClientOptions};
use tokio::time::{MissedTickBehavior, interval};
use tokio_util::sync::CancellationToken;
#[derive(Clone)]
pub struct ClickHouseConfig {
pub endpoint: String,
pub username: String,
pub password: String,
pub database: String,
pub interval: Duration,
}
impl Default for ClickHouseConfig {
fn default() -> Self {
Self {
endpoint: "127.0.0.1:9000".to_string(),
username: "default".to_string(),
password: String::new(),
database: "default".to_string(),
interval: Duration::from_secs(60),
}
}
}
impl ClickHouseConfig {
pub fn new(endpoint: impl Into<String>) -> Self {
Self {
endpoint: endpoint.into(),
..Default::default()
}
}
pub fn with_credentials(
mut self,
username: impl Into<String>,
password: impl Into<String>,
) -> Self {
self.username = username.into();
self.password = password.into();
self
}
pub fn with_database(mut self, database: impl Into<String>) -> Self {
self.database = database.into();
self
}
pub fn with_interval(mut self, interval: Duration) -> Self {
self.interval = interval;
self
}
}
const MAX_BACKOFF: Duration = Duration::from_secs(300);
const BASE_BACKOFF: Duration = Duration::from_secs(5);
pub(crate) fn backoff_with_jitter(consecutive_failures: u32) -> Duration {
let exp = consecutive_failures.saturating_sub(1).min(10);
let base_ms = BASE_BACKOFF.as_millis() as u64;
let backoff_ms = base_ms
.saturating_mul(1u64 << exp)
.min(MAX_BACKOFF.as_millis() as u64);
let nanos = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.subsec_nanos();
let jitter_range = (backoff_ms / 4).max(1);
let jitter = (nanos as i64 % (jitter_range * 2 + 1) as i64) - jitter_range as i64;
Duration::from_millis((backoff_ms as i64 + jitter).max(0) as u64)
}
pub(crate) async fn connect(config: &ClickHouseConfig) -> klickhouse::Result<Client> {
connect_with_database(config, &config.database).await
}
pub(crate) async fn connect_with_database(
config: &ClickHouseConfig,
database: &str,
) -> klickhouse::Result<Client> {
let opts = ClientOptions {
username: config.username.clone(),
password: config.password.clone(),
default_database: database.to_string(),
tcp_nodelay: true,
};
Client::connect(config.endpoint.as_str(), opts).await
}
pub(crate) fn quote_ident(ident: &str) -> String {
format!("`{}`", ident.replace('`', "``"))
}
pub(crate) fn qualified_table(database: &str, table: &str) -> String {
format!("{}.{}", quote_ident(database), quote_ident(table))
}
pub async fn run<R, F, T>(
config: ClickHouseConfig,
table: impl Into<String>,
cancel: CancellationToken,
mut collect_fn: F,
mut translator: T,
) where
R: klickhouse::Row + Send + Sync + 'static,
F: FnMut(&mut Vec<pb::Metric>),
T: FnMut(&pb::Metric) -> Vec<R>,
{
let table = table.into();
let query = format!(
"INSERT INTO {} FORMAT native",
qualified_table(&config.database, &table)
);
log::info!(
"Starting ClickHouse exporter, endpoint={}, table={}.{}, interval={}s",
config.endpoint,
config.database,
table,
config.interval.as_secs()
);
let mut client = match connect(&config).await {
Ok(c) => c,
Err(e) => {
log::error!(
"Failed to connect to ClickHouse at {}: {e}",
config.endpoint
);
return;
}
};
let mut interval_timer = interval(config.interval);
interval_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
interval_timer.tick().await;
let mut consecutive_failures: u32 = 0;
let mut metrics_buf: Vec<pb::Metric> = Vec::new();
loop {
tokio::select! {
_ = interval_timer.tick() => {}
_ = cancel.cancelled() => {
log::info!("ClickHouse exporter shutting down, performing final export");
let _ = export_once(
&client,
&query,
&mut collect_fn,
&mut translator,
&mut metrics_buf,
).await;
return;
}
}
if consecutive_failures > 0 {
let backoff = backoff_with_jitter(consecutive_failures);
log::debug!(
"ClickHouse export backing off {}ms (failures={consecutive_failures})",
backoff.as_millis()
);
tokio::select! {
_ = tokio::time::sleep(backoff) => {}
_ = cancel.cancelled() => {
let _ = export_once(
&client,
&query,
&mut collect_fn,
&mut translator,
&mut metrics_buf,
).await;
return;
}
}
}
if client.is_closed() {
match connect(&config).await {
Ok(c) => {
log::info!("Reconnected to ClickHouse");
client = c;
}
Err(e) => {
consecutive_failures = consecutive_failures.saturating_add(1);
log::warn!("ClickHouse reconnect failed: {e}");
continue;
}
}
}
match export_once(
&client,
&query,
&mut collect_fn,
&mut translator,
&mut metrics_buf,
)
.await
{
Ok(0) => {}
Ok(n) => {
consecutive_failures = 0;
log::debug!("Exported {n} rows to ClickHouse");
}
Err(e) => {
consecutive_failures = consecutive_failures.saturating_add(1);
log::warn!("ClickHouse insert failed: {e}");
}
}
}
}
async fn export_once<R, F, T>(
client: &Client,
query: &str,
collect_fn: &mut F,
translator: &mut T,
metrics_buf: &mut Vec<pb::Metric>,
) -> klickhouse::Result<usize>
where
R: klickhouse::Row + Send + Sync + 'static,
F: FnMut(&mut Vec<pb::Metric>),
T: FnMut(&pb::Metric) -> Vec<R>,
{
metrics_buf.clear();
collect_fn(metrics_buf);
if metrics_buf.is_empty() {
return Ok(0);
}
let mut rows: Vec<R> = Vec::new();
for m in metrics_buf.iter() {
rows.extend(translator(m));
}
if rows.is_empty() {
return Ok(0);
}
let count = rows.len();
client.insert_native_block(query, rows).await?;
Ok(count)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn quote_ident_escapes_backticks() {
assert_eq!(quote_ident("plain"), "`plain`");
assert_eq!(quote_ident("a`b"), "`a``b`");
}
#[test]
fn first_backoff_is_centered_on_base_delay() {
let backoff = backoff_with_jitter(1);
assert!(backoff >= Duration::from_millis(3_750));
assert!(backoff <= Duration::from_millis(6_250));
}
}