Skip to main content

fast_telemetry_export/clickhouse/
mod.rs

1//! ClickHouse native-protocol metrics exporter.
2//!
3//! Streams metrics directly into ClickHouse via the native TCP protocol
4//! (port 9000) using the [`klickhouse`] crate. The wire format is binary
5//! columnar with optional LZ4 compression — no JSON / HTTP overhead.
6//!
7//! Three layers are provided:
8//!
9//! - **First-party OTel-standard rows** ([`otel_standard::run_first_party`]) —
10//!   accepts a `fast_telemetry::clickhouse::ClickHouseMetricBatch` producer,
11//!   usually generated by `#[derive(ExportMetrics)]` plus `#[clickhouse]`.
12//!   This avoids building OTLP protobuf metrics before creating rows.
13//!
14//! - **Primitive [`run`]** — generic over a caller-supplied
15//!   [`klickhouse::Row`] type and translator closure
16//!   `FnMut(&pb::Metric) -> Vec<R>`. Use this when you want full control
17//!   over the schema, table layout, and per-metric translation.
18//!   Schema/migrations are the caller's responsibility.
19//!
20//! - **OTLP-to-OTel-standard schema** ([`otel_standard::run`]) — drop-in
21//!   exporter that writes to four tables compatible with the
22//!   [OpenTelemetry Collector ClickHouse exporter] metric schema
23//!   (`otel_metrics_sum`, `otel_metrics_gauge`, `otel_metrics_histogram`,
24//!   `otel_metrics_exponential_histogram`). Auto-creates the configured
25//!   database and tables on startup.
26//!
27//! All layers share [`ClickHouseConfig`] for connection settings and route
28//! through the same `connect` / backoff helpers — `otel_standard` costs
29//! nothing extra at runtime versus rolling your own with the primitive.
30//!
31//! [OpenTelemetry Collector ClickHouse exporter]: https://github.com/open-telemetry/opentelemetry-collector-contrib/tree/main/exporter/clickhouseexporter
32
33pub mod otel_standard;
34
35use std::time::Duration;
36
37use fast_telemetry::otlp::pb;
38use klickhouse::{Client, ClientOptions};
39use tokio::time::{MissedTickBehavior, interval};
40use tokio_util::sync::CancellationToken;
41
42/// Connection-level configuration shared by every ClickHouse exporter in this
43/// module.
44#[derive(Clone)]
45pub struct ClickHouseConfig {
46    /// `host:port` of the ClickHouse native TCP endpoint (default `127.0.0.1:9000`).
47    pub endpoint: String,
48    /// Username (default `default`).
49    pub username: String,
50    /// Password (default empty).
51    pub password: String,
52    /// Database to insert into (default `default`).
53    pub database: String,
54    /// Export interval (default 60s).
55    pub interval: Duration,
56}
57
58impl Default for ClickHouseConfig {
59    fn default() -> Self {
60        Self {
61            endpoint: "127.0.0.1:9000".to_string(),
62            username: "default".to_string(),
63            password: String::new(),
64            database: "default".to_string(),
65            interval: Duration::from_secs(60),
66        }
67    }
68}
69
70impl ClickHouseConfig {
71    pub fn new(endpoint: impl Into<String>) -> Self {
72        Self {
73            endpoint: endpoint.into(),
74            ..Default::default()
75        }
76    }
77
78    pub fn with_credentials(
79        mut self,
80        username: impl Into<String>,
81        password: impl Into<String>,
82    ) -> Self {
83        self.username = username.into();
84        self.password = password.into();
85        self
86    }
87
88    pub fn with_database(mut self, database: impl Into<String>) -> Self {
89        self.database = database.into();
90        self
91    }
92
93    pub fn with_interval(mut self, interval: Duration) -> Self {
94        self.interval = interval;
95        self
96    }
97}
98
99const MAX_BACKOFF: Duration = Duration::from_secs(300);
100const BASE_BACKOFF: Duration = Duration::from_secs(5);
101
102pub(crate) fn backoff_with_jitter(consecutive_failures: u32) -> Duration {
103    let exp = consecutive_failures.saturating_sub(1).min(10);
104    let base_ms = BASE_BACKOFF.as_millis() as u64;
105    let backoff_ms = base_ms
106        .saturating_mul(1u64 << exp)
107        .min(MAX_BACKOFF.as_millis() as u64);
108
109    let nanos = std::time::SystemTime::now()
110        .duration_since(std::time::UNIX_EPOCH)
111        .unwrap_or_default()
112        .subsec_nanos();
113    let jitter_range = (backoff_ms / 4).max(1);
114    let jitter = (nanos as i64 % (jitter_range * 2 + 1) as i64) - jitter_range as i64;
115    Duration::from_millis((backoff_ms as i64 + jitter).max(0) as u64)
116}
117
118pub(crate) async fn connect(config: &ClickHouseConfig) -> klickhouse::Result<Client> {
119    connect_with_database(config, &config.database).await
120}
121
122pub(crate) async fn connect_with_database(
123    config: &ClickHouseConfig,
124    database: &str,
125) -> klickhouse::Result<Client> {
126    let opts = ClientOptions {
127        username: config.username.clone(),
128        password: config.password.clone(),
129        default_database: database.to_string(),
130        tcp_nodelay: true,
131    };
132    Client::connect(config.endpoint.as_str(), opts).await
133}
134
135pub(crate) fn quote_ident(ident: &str) -> String {
136    format!("`{}`", ident.replace('`', "``"))
137}
138
139pub(crate) fn qualified_table(database: &str, table: &str) -> String {
140    format!("{}.{}", quote_ident(database), quote_ident(table))
141}
142
143/// Run the generic, single-table ClickHouse export loop.
144///
145/// `collect_fn` populates a `Vec<pb::Metric>` each tick (typically by calling
146/// the derive-generated `export_otlp(out)` method on your metrics struct).
147/// `translator` is invoked once per metric and may return zero or more rows
148/// of any [`klickhouse::Row`] type; the rows are concatenated and inserted
149/// in a single native-protocol block per cycle.
150///
151/// Schema and migrations are the caller's responsibility — the table named
152/// by `table` must exist in `config.database` before the first insert,
153/// or the first export will fail (the loop will retry with backoff).
154///
155/// On cancellation, a final export is performed to flush pending metrics.
156///
157/// For multi-table schemas, spawn one [`run`] task per table; each gets its
158/// own connection.
159///
160/// # Example
161///
162/// ```ignore
163/// use std::sync::Arc;
164/// use std::time::Duration;
165///
166/// use fast_telemetry::otlp::pb;
167/// use fast_telemetry_export::clickhouse::{ClickHouseConfig, run};
168/// use klickhouse::{DateTime64, Tz};
169/// use tokio_util::sync::CancellationToken;
170///
171/// #[derive(klickhouse::Row, Debug)]
172/// #[allow(non_snake_case)]
173/// struct MyRow {
174///     MetricName: String,
175///     TimeUnix: DateTime64<9>,
176///     Value: f64,
177/// }
178///
179/// let metrics = Arc::new(MyMetrics::new());
180/// let cancel = CancellationToken::new();
181/// let config = ClickHouseConfig::new("127.0.0.1:9000")
182///     .with_database("telemetry")
183///     .with_interval(Duration::from_secs(30));
184///
185/// let m = metrics.clone();
186/// tokio::spawn(run(
187///     config,
188///     "my_metrics",
189///     cancel,
190///     move |out| m.export_otlp(out),
191///     |metric| match &metric.data {
192///         Some(pb::metric::Data::Sum(s)) => s
193///             .data_points
194///             .iter()
195///             .map(|dp| MyRow {
196///                 MetricName: metric.name.clone(),
197///                 TimeUnix: DateTime64::<9>(Tz::UTC, dp.time_unix_nano),
198///                 Value: 0.0,
199///             })
200///             .collect(),
201///         _ => Vec::new(),
202///     },
203/// ));
204/// ```
205pub async fn run<R, F, T>(
206    config: ClickHouseConfig,
207    table: impl Into<String>,
208    cancel: CancellationToken,
209    mut collect_fn: F,
210    mut translator: T,
211) where
212    R: klickhouse::Row + Send + Sync + 'static,
213    F: FnMut(&mut Vec<pb::Metric>),
214    T: FnMut(&pb::Metric) -> Vec<R>,
215{
216    let table = table.into();
217    let query = format!(
218        "INSERT INTO {} FORMAT native",
219        qualified_table(&config.database, &table)
220    );
221
222    log::info!(
223        "Starting ClickHouse exporter, endpoint={}, table={}.{}, interval={}s",
224        config.endpoint,
225        config.database,
226        table,
227        config.interval.as_secs()
228    );
229
230    let mut client = match connect(&config).await {
231        Ok(c) => c,
232        Err(e) => {
233            log::error!(
234                "Failed to connect to ClickHouse at {}: {e}",
235                config.endpoint
236            );
237            return;
238        }
239    };
240
241    let mut interval_timer = interval(config.interval);
242    interval_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
243    interval_timer.tick().await;
244
245    let mut consecutive_failures: u32 = 0;
246    let mut metrics_buf: Vec<pb::Metric> = Vec::new();
247
248    loop {
249        tokio::select! {
250            _ = interval_timer.tick() => {}
251            _ = cancel.cancelled() => {
252                log::info!("ClickHouse exporter shutting down, performing final export");
253                let _ = export_once(
254                    &client,
255                    &query,
256                    &mut collect_fn,
257                    &mut translator,
258                    &mut metrics_buf,
259                ).await;
260                return;
261            }
262        }
263
264        if consecutive_failures > 0 {
265            let backoff = backoff_with_jitter(consecutive_failures);
266            log::debug!(
267                "ClickHouse export backing off {}ms (failures={consecutive_failures})",
268                backoff.as_millis()
269            );
270            tokio::select! {
271                _ = tokio::time::sleep(backoff) => {}
272                _ = cancel.cancelled() => {
273                    let _ = export_once(
274                        &client,
275                        &query,
276                        &mut collect_fn,
277                        &mut translator,
278                        &mut metrics_buf,
279                    ).await;
280                    return;
281                }
282            }
283        }
284
285        if client.is_closed() {
286            match connect(&config).await {
287                Ok(c) => {
288                    log::info!("Reconnected to ClickHouse");
289                    client = c;
290                }
291                Err(e) => {
292                    consecutive_failures = consecutive_failures.saturating_add(1);
293                    log::warn!("ClickHouse reconnect failed: {e}");
294                    continue;
295                }
296            }
297        }
298
299        match export_once(
300            &client,
301            &query,
302            &mut collect_fn,
303            &mut translator,
304            &mut metrics_buf,
305        )
306        .await
307        {
308            Ok(0) => {}
309            Ok(n) => {
310                consecutive_failures = 0;
311                log::debug!("Exported {n} rows to ClickHouse");
312            }
313            Err(e) => {
314                consecutive_failures = consecutive_failures.saturating_add(1);
315                log::warn!("ClickHouse insert failed: {e}");
316            }
317        }
318    }
319}
320
321async fn export_once<R, F, T>(
322    client: &Client,
323    query: &str,
324    collect_fn: &mut F,
325    translator: &mut T,
326    metrics_buf: &mut Vec<pb::Metric>,
327) -> klickhouse::Result<usize>
328where
329    R: klickhouse::Row + Send + Sync + 'static,
330    F: FnMut(&mut Vec<pb::Metric>),
331    T: FnMut(&pb::Metric) -> Vec<R>,
332{
333    metrics_buf.clear();
334    collect_fn(metrics_buf);
335    if metrics_buf.is_empty() {
336        return Ok(0);
337    }
338
339    let mut rows: Vec<R> = Vec::new();
340    for m in metrics_buf.iter() {
341        rows.extend(translator(m));
342    }
343
344    if rows.is_empty() {
345        return Ok(0);
346    }
347
348    let count = rows.len();
349    client.insert_native_block(query, rows).await?;
350    Ok(count)
351}
352
353#[cfg(test)]
354mod tests {
355    use super::*;
356
357    #[test]
358    fn quote_ident_escapes_backticks() {
359        assert_eq!(quote_ident("plain"), "`plain`");
360        assert_eq!(quote_ident("a`b"), "`a``b`");
361    }
362
363    #[test]
364    fn first_backoff_is_centered_on_base_delay() {
365        let backoff = backoff_with_jitter(1);
366        assert!(backoff >= Duration::from_millis(3_750));
367        assert!(backoff <= Duration::from_millis(6_250));
368    }
369}