fast_telemetry_export/clickhouse/
mod.rs1pub mod otel_standard;
34
35use std::time::Duration;
36
37use fast_telemetry::otlp::pb;
38use klickhouse::{Client, ClientOptions};
39use tokio::time::{MissedTickBehavior, interval};
40use tokio_util::sync::CancellationToken;
41
42#[derive(Clone)]
45pub struct ClickHouseConfig {
46 pub endpoint: String,
48 pub username: String,
50 pub password: String,
52 pub database: String,
54 pub interval: Duration,
56}
57
58impl Default for ClickHouseConfig {
59 fn default() -> Self {
60 Self {
61 endpoint: "127.0.0.1:9000".to_string(),
62 username: "default".to_string(),
63 password: String::new(),
64 database: "default".to_string(),
65 interval: Duration::from_secs(60),
66 }
67 }
68}
69
70impl ClickHouseConfig {
71 pub fn new(endpoint: impl Into<String>) -> Self {
72 Self {
73 endpoint: endpoint.into(),
74 ..Default::default()
75 }
76 }
77
78 pub fn with_credentials(
79 mut self,
80 username: impl Into<String>,
81 password: impl Into<String>,
82 ) -> Self {
83 self.username = username.into();
84 self.password = password.into();
85 self
86 }
87
88 pub fn with_database(mut self, database: impl Into<String>) -> Self {
89 self.database = database.into();
90 self
91 }
92
93 pub fn with_interval(mut self, interval: Duration) -> Self {
94 self.interval = interval;
95 self
96 }
97}
98
99const MAX_BACKOFF: Duration = Duration::from_secs(300);
100const BASE_BACKOFF: Duration = Duration::from_secs(5);
101
102pub(crate) fn backoff_with_jitter(consecutive_failures: u32) -> Duration {
103 let exp = consecutive_failures.saturating_sub(1).min(10);
104 let base_ms = BASE_BACKOFF.as_millis() as u64;
105 let backoff_ms = base_ms
106 .saturating_mul(1u64 << exp)
107 .min(MAX_BACKOFF.as_millis() as u64);
108
109 let nanos = std::time::SystemTime::now()
110 .duration_since(std::time::UNIX_EPOCH)
111 .unwrap_or_default()
112 .subsec_nanos();
113 let jitter_range = (backoff_ms / 4).max(1);
114 let jitter = (nanos as i64 % (jitter_range * 2 + 1) as i64) - jitter_range as i64;
115 Duration::from_millis((backoff_ms as i64 + jitter).max(0) as u64)
116}
117
118pub(crate) async fn connect(config: &ClickHouseConfig) -> klickhouse::Result<Client> {
119 connect_with_database(config, &config.database).await
120}
121
122pub(crate) async fn connect_with_database(
123 config: &ClickHouseConfig,
124 database: &str,
125) -> klickhouse::Result<Client> {
126 let opts = ClientOptions {
127 username: config.username.clone(),
128 password: config.password.clone(),
129 default_database: database.to_string(),
130 tcp_nodelay: true,
131 };
132 Client::connect(config.endpoint.as_str(), opts).await
133}
134
135pub(crate) fn quote_ident(ident: &str) -> String {
136 format!("`{}`", ident.replace('`', "``"))
137}
138
139pub(crate) fn qualified_table(database: &str, table: &str) -> String {
140 format!("{}.{}", quote_ident(database), quote_ident(table))
141}
142
143pub async fn run<R, F, T>(
206 config: ClickHouseConfig,
207 table: impl Into<String>,
208 cancel: CancellationToken,
209 mut collect_fn: F,
210 mut translator: T,
211) where
212 R: klickhouse::Row + Send + Sync + 'static,
213 F: FnMut(&mut Vec<pb::Metric>),
214 T: FnMut(&pb::Metric) -> Vec<R>,
215{
216 let table = table.into();
217 let query = format!(
218 "INSERT INTO {} FORMAT native",
219 qualified_table(&config.database, &table)
220 );
221
222 log::info!(
223 "Starting ClickHouse exporter, endpoint={}, table={}.{}, interval={}s",
224 config.endpoint,
225 config.database,
226 table,
227 config.interval.as_secs()
228 );
229
230 let mut client = match connect(&config).await {
231 Ok(c) => c,
232 Err(e) => {
233 log::error!(
234 "Failed to connect to ClickHouse at {}: {e}",
235 config.endpoint
236 );
237 return;
238 }
239 };
240
241 let mut interval_timer = interval(config.interval);
242 interval_timer.set_missed_tick_behavior(MissedTickBehavior::Skip);
243 interval_timer.tick().await;
244
245 let mut consecutive_failures: u32 = 0;
246 let mut metrics_buf: Vec<pb::Metric> = Vec::new();
247
248 loop {
249 tokio::select! {
250 _ = interval_timer.tick() => {}
251 _ = cancel.cancelled() => {
252 log::info!("ClickHouse exporter shutting down, performing final export");
253 let _ = export_once(
254 &client,
255 &query,
256 &mut collect_fn,
257 &mut translator,
258 &mut metrics_buf,
259 ).await;
260 return;
261 }
262 }
263
264 if consecutive_failures > 0 {
265 let backoff = backoff_with_jitter(consecutive_failures);
266 log::debug!(
267 "ClickHouse export backing off {}ms (failures={consecutive_failures})",
268 backoff.as_millis()
269 );
270 tokio::select! {
271 _ = tokio::time::sleep(backoff) => {}
272 _ = cancel.cancelled() => {
273 let _ = export_once(
274 &client,
275 &query,
276 &mut collect_fn,
277 &mut translator,
278 &mut metrics_buf,
279 ).await;
280 return;
281 }
282 }
283 }
284
285 if client.is_closed() {
286 match connect(&config).await {
287 Ok(c) => {
288 log::info!("Reconnected to ClickHouse");
289 client = c;
290 }
291 Err(e) => {
292 consecutive_failures = consecutive_failures.saturating_add(1);
293 log::warn!("ClickHouse reconnect failed: {e}");
294 continue;
295 }
296 }
297 }
298
299 match export_once(
300 &client,
301 &query,
302 &mut collect_fn,
303 &mut translator,
304 &mut metrics_buf,
305 )
306 .await
307 {
308 Ok(0) => {}
309 Ok(n) => {
310 consecutive_failures = 0;
311 log::debug!("Exported {n} rows to ClickHouse");
312 }
313 Err(e) => {
314 consecutive_failures = consecutive_failures.saturating_add(1);
315 log::warn!("ClickHouse insert failed: {e}");
316 }
317 }
318 }
319}
320
321async fn export_once<R, F, T>(
322 client: &Client,
323 query: &str,
324 collect_fn: &mut F,
325 translator: &mut T,
326 metrics_buf: &mut Vec<pb::Metric>,
327) -> klickhouse::Result<usize>
328where
329 R: klickhouse::Row + Send + Sync + 'static,
330 F: FnMut(&mut Vec<pb::Metric>),
331 T: FnMut(&pb::Metric) -> Vec<R>,
332{
333 metrics_buf.clear();
334 collect_fn(metrics_buf);
335 if metrics_buf.is_empty() {
336 return Ok(0);
337 }
338
339 let mut rows: Vec<R> = Vec::new();
340 for m in metrics_buf.iter() {
341 rows.extend(translator(m));
342 }
343
344 if rows.is_empty() {
345 return Ok(0);
346 }
347
348 let count = rows.len();
349 client.insert_native_block(query, rows).await?;
350 Ok(count)
351}
352
353#[cfg(test)]
354mod tests {
355 use super::*;
356
357 #[test]
358 fn quote_ident_escapes_backticks() {
359 assert_eq!(quote_ident("plain"), "`plain`");
360 assert_eq!(quote_ident("a`b"), "`a``b`");
361 }
362
363 #[test]
364 fn first_backoff_is_centered_on_base_delay() {
365 let backoff = backoff_with_jitter(1);
366 assert!(backoff >= Duration::from_millis(3_750));
367 assert!(backoff <= Duration::from_millis(6_250));
368 }
369}