fast-telemetry-export 0.3.0

Export adapters for fast-telemetry metrics: DogStatsD, OTLP, ClickHouse, span export, and stale-series sweeping
Documentation
//! ClickHouse native-protocol metrics exporter.
//!
//! Streams metrics directly into ClickHouse via the native TCP protocol
//! (port 9000) using the [`klickhouse`] crate. The wire format is binary
//! columnar with optional LZ4 compression — no JSON / HTTP overhead.
//!
//! Three layers are provided:
//!
//! - **First-party OTel-standard rows** ([`otel_standard::run_first_party`]) —
//!   accepts a `fast_telemetry::clickhouse::ClickHouseMetricBatch` producer,
//!   usually generated by `#[derive(ExportMetrics)]` plus `#[clickhouse]`.
//!   This avoids building OTLP protobuf metrics before creating rows.
//!
//! - **Primitive [`run`]** — generic over a caller-supplied
//!   [`klickhouse::Row`] type and translator closure
//!   `FnMut(&pb::Metric) -> Vec<R>`. Use this when you want full control
//!   over the schema, table layout, and per-metric translation.
//!   Schema/migrations are the caller's responsibility.
//!
//! - **OTLP-to-OTel-standard schema** ([`otel_standard::run`]) — drop-in
//!   exporter that writes to four tables compatible with the
//!   [OpenTelemetry Collector ClickHouse exporter] metric schema
//!   (`otel_metrics_sum`, `otel_metrics_gauge`, `otel_metrics_histogram`,
//!   `otel_metrics_exponential_histogram`). Auto-creates the configured
//!   database and tables on startup.
//!
//! All layers share [`ClickHouseConfig`] for connection settings and route
//! through the same `connect` / backoff helpers — `otel_standard` costs
//! nothing extra at runtime versus rolling your own with the primitive.
//!
//! [OpenTelemetry Collector ClickHouse exporter]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/clickhouseexporter

pub mod otel_standard;

use std::time::Duration;

use fast_telemetry::otlp::pb;
use klickhouse::{Client, ClientOptions};
use tokio::time::{MissedTickBehavior, interval};
use tokio_util::sync::CancellationToken;

/// Connection-level configuration shared by every ClickHouse exporter in this
/// module.
#[derive(Clone)]
pub struct ClickHouseConfig {
    /// `host:port` of the ClickHouse native TCP endpoint (default `127.0.0.1:9000`).
    pub endpoint: String,
    /// Username (default `default`).
    pub username: String,
    /// Password (default empty).
    pub password: String,
    /// Database to insert into (default `default`).
    pub database: String,
    /// Export interval (default 60s).
    pub interval: Duration,
}

impl Default for ClickHouseConfig {
    fn default() -> Self {
        Self {
            endpoint: "127.0.0.1:9000".to_string(),
            username: "default".to_string(),
            password: String::new(),
            database: "default".to_string(),
            interval: Duration::from_secs(60),
        }
    }
}

impl ClickHouseConfig {
    pub fn new(endpoint: impl Into<String>) -> Self {
        Self {
            endpoint: endpoint.into(),
            ..Default::default()
        }
    }

    pub fn with_credentials(
        mut self,
        username: impl Into<String>,
        password: impl Into<String>,
    ) -> Self {
        self.username = username.into();
        self.password = password.into();
        self
    }

    pub fn with_database(mut self, database: impl Into<String>) -> Self {
        self.database = database.into();
        self
    }

    pub fn with_interval(mut self, interval: Duration) -> Self {
        self.interval = interval;
        self
    }
}

const MAX_BACKOFF: Duration = Duration::from_secs(300);
const BASE_BACKOFF: Duration = Duration::from_secs(5);

pub(crate) fn backoff_with_jitter(consecutive_failures: u32) -> Duration {
    let exp = consecutive_failures.saturating_sub(1).min(10);
    let base_ms = BASE_BACKOFF.as_millis() as u64;
    let backoff_ms = base_ms
        .saturating_mul(1u64 << exp)
        .min(MAX_BACKOFF.as_millis() as u64);

    let nanos = std::time::SystemTime::now()
        .duration_since(std::time::UNIX_EPOCH)
        .unwrap_or_default()
        .subsec_nanos();
    let jitter_range = (backoff_ms / 4).max(1);
    let jitter = (nanos as i64 % (jitter_range * 2 + 1) as i64) - jitter_range as i64;
    Duration::from_millis((backoff_ms as i64 + jitter).max(0) as u64)
}

pub(crate) async fn connect(config: &ClickHouseConfig) -> klickhouse::Result<Client> {
    connect_with_database(config, &config.database).await
}

pub(crate) async fn connect_with_database(
    config: &ClickHouseConfig,
    database: &str,
) -> klickhouse::Result<Client> {
    let opts = ClientOptions {
        username: config.username.clone(),
        password: config.password.clone(),
        default_database: database.to_string(),
        tcp_nodelay: true,
    };
    Client::connect(config.endpoint.as_str(), opts).await
}

pub(crate) fn quote_ident(ident: &str) -> String {
    format!("`{}`", ident.replace('`', "``"))
}

pub(crate) fn qualified_table(database: &str, table: &str) -> String {
    format!("{}.{}", quote_ident(database), quote_ident(table))
}

/// Run the generic, single-table ClickHouse export loop.
///
/// `collect_fn` populates a `Vec<pb::Metric>` each tick (typically by calling
/// the derive-generated `export_otlp(out)` method on your metrics struct).
/// `translator` is invoked once per metric and may return zero or more rows
/// of any [`klickhouse::Row`] type; the rows are concatenated and inserted
/// in a single native-protocol block per cycle.
///
/// Schema and migrations are the caller's responsibility — the table named
/// by `table` must exist in `config.database` before the first insert,
/// or the first export will fail (the loop will retry with backoff).
///
/// On cancellation, a final export is performed to flush pending metrics.
///
/// For multi-table schemas, spawn one [`run`] task per table; each gets its
/// own connection.
///
/// # Example
///
/// ```ignore
/// use std::sync::Arc;
/// use std::time::Duration;
///
/// use fast_telemetry::otlp::pb;
/// use fast_telemetry_export::clickhouse::{ClickHouseConfig, run};
/// use klickhouse::{DateTime64, Tz};
/// use tokio_util::sync::CancellationToken;
///
/// #[derive(klickhouse::Row, Debug)]
/// #[allow(non_snake_case)]
/// struct MyRow {
///     MetricName: String,
///     TimeUnix: DateTime64<9>,
///     Value: f64,
/// }
///
/// let metrics = Arc::new(MyMetrics::new());
/// let cancel = CancellationToken::new();
/// let config = ClickHouseConfig::new("127.0.0.1:9000")
///     .with_database("telemetry")
///     .with_interval(Duration::from_secs(30));
///
/// let m = metrics.clone();
/// tokio::spawn(run(
///     config,
///     "my_metrics",
///     cancel,
///     move |out| m.export_otlp(out),
///     |metric| match &metric.data {
///         Some(pb::metric::Data::Sum(s)) => s
///             .data_points
///             .iter()
///             .map(|dp| MyRow {
///                 MetricName: metric.name.clone(),
///                 TimeUnix: DateTime64::<9>(Tz::UTC, dp.time_unix_nano),
///                 Value: 0.0,
///             })
///             .collect(),
///         _ => Vec::new(),
///     },
/// ));
/// ```
pub async fn run<R, F, T>(
    config: ClickHouseConfig,
    table: impl Into<String>,
    cancel: CancellationToken,
    mut collect_fn: F,
    mut translator: T,
) where
    R: klickhouse::Row + Send + Sync + 'static,
    F: FnMut(&mut Vec<pb::Metric>),
    T: FnMut(&pb::Metric) -> Vec<R>,
{
    let table = table.into();
    let query = format!(
        "INSERT INTO {} FORMAT native",
        qualified_table(&config.database, &table)
    );

    log::info!(
        "Starting ClickHouse exporter, endpoint={}, table={}.{}, interval={}s",
        config.endpoint,
        config.database,
        table,
        config.interval.as_secs()
    );

    let mut client = match connect(&config).await {
        Ok(c) => c,
        Err(e) => {
            log::error!(
                "Failed to connect to ClickHouse at {}: {e}",
                config.endpoint
            );
            return;
        }
    };

    let mut interval_timer = interval(config.interval);
    interval_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
    interval_timer.tick().await;

    let mut consecutive_failures: u32 = 0;
    let mut metrics_buf: Vec<pb::Metric> = Vec::new();

    loop {
        tokio::select! {
            _ = interval_timer.tick() => {}
            _ = cancel.cancelled() => {
                log::info!("ClickHouse exporter shutting down, performing final export");
                let _ = export_once(
                    &client,
                    &query,
                    &mut collect_fn,
                    &mut translator,
                    &mut metrics_buf,
                ).await;
                return;
            }
        }

        if consecutive_failures > 0 {
            let backoff = backoff_with_jitter(consecutive_failures);
            log::debug!(
                "ClickHouse export backing off {}ms (failures={consecutive_failures})",
                backoff.as_millis()
            );
            tokio::select! {
                _ = tokio::time::sleep(backoff) => {}
                _ = cancel.cancelled() => {
                    let _ = export_once(
                        &client,
                        &query,
                        &mut collect_fn,
                        &mut translator,
                        &mut metrics_buf,
                    ).await;
                    return;
                }
            }
        }

        if client.is_closed() {
            match connect(&config).await {
                Ok(c) => {
                    log::info!("Reconnected to ClickHouse");
                    client = c;
                }
                Err(e) => {
                    consecutive_failures = consecutive_failures.saturating_add(1);
                    log::warn!("ClickHouse reconnect failed: {e}");
                    continue;
                }
            }
        }

        match export_once(
            &client,
            &query,
            &mut collect_fn,
            &mut translator,
            &mut metrics_buf,
        )
        .await
        {
            Ok(0) => {}
            Ok(n) => {
                consecutive_failures = 0;
                log::debug!("Exported {n} rows to ClickHouse");
            }
            Err(e) => {
                consecutive_failures = consecutive_failures.saturating_add(1);
                log::warn!("ClickHouse insert failed: {e}");
            }
        }
    }
}

async fn export_once<R, F, T>(
    client: &Client,
    query: &str,
    collect_fn: &mut F,
    translator: &mut T,
    metrics_buf: &mut Vec<pb::Metric>,
) -> klickhouse::Result<usize>
where
    R: klickhouse::Row + Send + Sync + 'static,
    F: FnMut(&mut Vec<pb::Metric>),
    T: FnMut(&pb::Metric) -> Vec<R>,
{
    metrics_buf.clear();
    collect_fn(metrics_buf);
    if metrics_buf.is_empty() {
        return Ok(0);
    }

    let mut rows: Vec<R> = Vec::new();
    for m in metrics_buf.iter() {
        rows.extend(translator(m));
    }

    if rows.is_empty() {
        return Ok(0);
    }

    let count = rows.len();
    client.insert_native_block(query, rows).await?;
    Ok(count)
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn quote_ident_escapes_backticks() {
        assert_eq!(quote_ident("plain"), "`plain`");
        assert_eq!(quote_ident("a`b"), "`a``b`");
    }

    #[test]
    fn first_backoff_is_centered_on_base_delay() {
        let backoff = backoff_with_jitter(1);
        assert!(backoff >= Duration::from_millis(3_750));
        assert!(backoff <= Duration::from_millis(6_250));
    }
}