1use arrow_array::RecordBatch;
32use arrow_ipc::writer::StreamWriter;
33use rand::Rng;
34use std::sync::atomic::{AtomicU64, Ordering};
35use std::sync::{mpsc, Arc, Mutex};
36use std::thread::{self, JoinHandle};
37use std::time::Duration;
38use thiserror::Error;
39use tracing::{debug, error, info, warn};
40
41#[derive(Debug, Error)]
43pub enum ClickHouseError {
44 #[error("invalid writer configuration: {0}")]
46 Config(String),
47 #[error("writer channel closed: {0}")]
49 ChannelClosed(String),
50 #[error("HTTP request failed: {0}")]
52 Http(String),
53 #[error("Arrow serialization failed: {0}")]
55 Arrow(#[from] arrow_schema::ArrowError),
56}
57
58#[derive(Debug, Clone)]
60pub struct ClickHouseConfig {
61 pub url: String,
65 pub table: String,
67 pub database: Option<String>,
69 pub username: Option<String>,
71 pub password: Option<String>,
73 pub max_batch_rows: usize,
87 pub max_retries: u32,
89 pub base_retry_delay: Duration,
93 pub max_retry_delay: Duration,
97 pub connect_timeout: Duration,
99 pub read_timeout: Duration,
101 pub gzip: bool,
105 pub workers: usize,
114 pub spill_dir: Option<std::path::PathBuf>,
125}
126
127impl Default for ClickHouseConfig {
128 fn default() -> Self {
129 Self {
130 url: "http://localhost:8123".to_string(),
131 table: "agent_data".to_string(),
132 database: None,
133 username: None,
134 password: None,
135 max_batch_rows: 100_000,
136 max_retries: 3,
137 base_retry_delay: Duration::from_millis(100),
138 max_retry_delay: Duration::from_secs(10),
139 connect_timeout: Duration::from_secs(10),
140 read_timeout: Duration::from_secs(60),
141 gzip: false,
142 workers: 1,
143 spill_dir: None,
144 }
145 }
146}
147
148impl ClickHouseConfig {
149 pub fn new(url: impl Into<String>, table: impl Into<String>) -> Self {
154 Self {
155 url: url.into(),
156 table: table.into(),
157 ..Self::default()
158 }
159 }
160
161 pub fn with_max_batch_rows(mut self, max_batch_rows: usize) -> Self {
164 self.max_batch_rows = max_batch_rows;
165 self
166 }
167
168 pub fn with_max_retries(mut self, max_retries: u32) -> Self {
170 self.max_retries = max_retries;
171 self
172 }
173
174 pub fn with_base_retry_delay(mut self, base_retry_delay: Duration) -> Self {
176 self.base_retry_delay = base_retry_delay;
177 self
178 }
179
180 pub fn with_max_retry_delay(mut self, max_retry_delay: Duration) -> Self {
182 self.max_retry_delay = max_retry_delay;
183 self
184 }
185
186 pub fn with_auth(mut self, username: impl Into<String>, password: impl Into<String>) -> Self {
189 self.username = Some(username.into());
190 self.password = Some(password.into());
191 self
192 }
193
194 pub fn with_database(mut self, database: impl Into<String>) -> Self {
196 self.database = Some(database.into());
197 self
198 }
199
200 pub fn with_timeouts(mut self, connect: Duration, read: Duration) -> Self {
202 self.connect_timeout = connect;
203 self.read_timeout = read;
204 self
205 }
206
207 pub fn with_gzip(mut self, enabled: bool) -> Self {
211 self.gzip = enabled;
212 self
213 }
214
215 pub fn with_workers(mut self, workers: usize) -> Self {
220 self.workers = workers.max(1);
221 self
222 }
223
224 pub fn with_spill_dir(mut self, dir: impl Into<std::path::PathBuf>) -> Self {
229 self.spill_dir = Some(dir.into());
230 self
231 }
232}
233
234pub enum WriterMessage {
236 Batch(RecordBatch),
238 Shutdown,
240}
241
242pub struct ClickHouseWriter {
248 sender: mpsc::SyncSender<WriterMessage>,
249 handles: Vec<JoinHandle<WriterStats>>,
250 metrics: Arc<WriterMetricsInner>,
251 workers: usize,
252}
253
254#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
256pub struct WriterStats {
257 pub batches_received: u64,
259 pub rows_received: u64,
261 pub batches_sent: u64,
263 pub rows_sent: u64,
265 pub errors: u64,
267 pub batches_failed: u64,
269 pub rows_failed: u64,
271 pub batches_spilled: u64,
276 pub rows_spilled: u64,
278 pub max_outstanding_batches: u64,
280}
281
282#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
284pub struct WriterMetricsSnapshot {
285 pub outstanding_batches: u64,
287 pub batches_received: u64,
289 pub rows_received: u64,
291 pub batches_sent: u64,
293 pub rows_sent: u64,
295 pub errors: u64,
297 pub batches_failed: u64,
299 pub rows_failed: u64,
301 pub batches_spilled: u64,
303 pub rows_spilled: u64,
305 pub max_outstanding_batches: u64,
307}
308
309#[derive(Default)]
310struct WriterMetricsInner {
311 outstanding_batches: AtomicU64,
312 batches_received: AtomicU64,
313 rows_received: AtomicU64,
314 batches_sent: AtomicU64,
315 rows_sent: AtomicU64,
316 errors: AtomicU64,
317 batches_failed: AtomicU64,
318 rows_failed: AtomicU64,
319 batches_spilled: AtomicU64,
320 rows_spilled: AtomicU64,
321 max_outstanding_batches: AtomicU64,
322}
323
324impl WriterMetricsInner {
325 fn snapshot(&self) -> WriterMetricsSnapshot {
326 WriterMetricsSnapshot {
327 outstanding_batches: self.outstanding_batches.load(Ordering::Relaxed),
328 batches_received: self.batches_received.load(Ordering::Relaxed),
329 rows_received: self.rows_received.load(Ordering::Relaxed),
330 batches_sent: self.batches_sent.load(Ordering::Relaxed),
331 rows_sent: self.rows_sent.load(Ordering::Relaxed),
332 errors: self.errors.load(Ordering::Relaxed),
333 batches_failed: self.batches_failed.load(Ordering::Relaxed),
334 rows_failed: self.rows_failed.load(Ordering::Relaxed),
335 batches_spilled: self.batches_spilled.load(Ordering::Relaxed),
336 rows_spilled: self.rows_spilled.load(Ordering::Relaxed),
337 max_outstanding_batches: self.max_outstanding_batches.load(Ordering::Relaxed),
338 }
339 }
340
341 fn update_max_outstanding(&self, value: u64) {
342 let mut current = self.max_outstanding_batches.load(Ordering::Relaxed);
343 while value > current {
344 match self.max_outstanding_batches.compare_exchange_weak(
345 current,
346 value,
347 Ordering::Relaxed,
348 Ordering::Relaxed,
349 ) {
350 Ok(_) => break,
351 Err(observed) => current = observed,
352 }
353 }
354 }
355}
356
357impl ClickHouseWriter {
358 pub fn new(config: ClickHouseConfig, capacity: usize) -> Result<Self, ClickHouseError> {
363 if capacity == 0 {
364 return Err(ClickHouseError::Config(
365 "channel capacity must be positive".to_string(),
366 ));
367 }
368 if config.max_retries == 0 {
369 return Err(ClickHouseError::Config(
370 "max_retries must be positive".to_string(),
371 ));
372 }
373 if config.url.trim().is_empty() {
374 return Err(ClickHouseError::Config("url must not be empty".to_string()));
375 }
376 if config.table.trim().is_empty() {
377 return Err(ClickHouseError::Config(
378 "table must not be empty".to_string(),
379 ));
380 }
381
382 let (sender, receiver) = mpsc::sync_channel::<WriterMessage>(capacity);
383 let metrics = Arc::new(WriterMetricsInner::default());
384 let workers = config.workers.max(1);
385 let receiver = Arc::new(Mutex::new(receiver));
386
387 let mut handles = Vec::with_capacity(workers);
388 for worker_id in 0..workers {
389 let metrics_for_thread = Arc::clone(&metrics);
390 let receiver_for_thread = Arc::clone(&receiver);
391 let config_for_thread = config.clone();
392 let handle = thread::spawn(move || {
393 let mut stats = WriterStats::default();
394 writer_loop(
395 worker_id,
396 &config_for_thread,
397 &receiver_for_thread,
398 &mut stats,
399 &metrics_for_thread,
400 );
401 stats
402 });
403 handles.push(handle);
404 }
405
406 Ok(Self {
407 sender,
408 handles,
409 metrics,
410 workers,
411 })
412 }
413
414 pub fn send(&self, batch: RecordBatch) -> Result<(), ClickHouseError> {
418 let outstanding = self
419 .metrics
420 .outstanding_batches
421 .fetch_add(1, Ordering::Relaxed)
422 + 1;
423 self.metrics.update_max_outstanding(outstanding);
424
425 if let Err(e) = self.sender.send(WriterMessage::Batch(batch)) {
426 self.metrics
427 .outstanding_batches
428 .fetch_sub(1, Ordering::Relaxed);
429 return Err(ClickHouseError::ChannelClosed(e.to_string()));
430 }
431
432 Ok(())
433 }
434
435 pub fn metrics(&self) -> WriterMetricsSnapshot {
437 self.metrics.snapshot()
438 }
439
440 pub fn shutdown(mut self) -> WriterStats {
442 for _ in 0..self.workers {
444 let _ = self.sender.send(WriterMessage::Shutdown);
445 }
446 let mut merged = WriterStats::default();
447 for h in self.handles.drain(..) {
448 if let Ok(stats) = h.join() {
449 merged.batches_received += stats.batches_received;
450 merged.rows_received += stats.rows_received;
451 merged.batches_sent += stats.batches_sent;
452 merged.rows_sent += stats.rows_sent;
453 merged.errors += stats.errors;
454 merged.batches_failed += stats.batches_failed;
455 merged.rows_failed += stats.rows_failed;
456 merged.batches_spilled += stats.batches_spilled;
457 merged.rows_spilled += stats.rows_spilled;
458 merged.max_outstanding_batches = merged
459 .max_outstanding_batches
460 .max(stats.max_outstanding_batches);
461 }
462 }
463 merged
464 }
465}
466
467fn writer_loop(
468 worker_id: usize,
469 config: &ClickHouseConfig,
470 receiver: &Mutex<mpsc::Receiver<WriterMessage>>,
471 stats: &mut WriterStats,
472 metrics: &WriterMetricsInner,
473) {
474 info!(
475 worker_id,
476 workers = config.workers,
477 table = %config.table,
478 url = %config.url,
479 database = ?config.database,
480 auth = config.username.is_some(),
481 gzip = config.gzip && cfg!(feature = "gzip"),
482 "ClickHouse writer started"
483 );
484
485 let agent = ureq::AgentBuilder::new()
486 .timeout_connect(config.connect_timeout)
487 .timeout_read(config.read_timeout)
488 .build();
489
490 let mut rng = rand::thread_rng();
491
492 loop {
493 let msg = match receiver.lock() {
497 Ok(guard) => guard.recv(),
498 Err(poisoned) => poisoned.into_inner().recv(),
499 };
500 let batch = match msg {
501 Ok(WriterMessage::Batch(b)) => b,
502 Ok(WriterMessage::Shutdown) => break,
503 Err(_) => break,
504 };
505
506 metrics.outstanding_batches.fetch_sub(1, Ordering::Relaxed);
507 let rows = batch.num_rows() as u64;
508 stats.batches_received += 1;
509 stats.rows_received += rows;
510 metrics.batches_received.fetch_add(1, Ordering::Relaxed);
511 metrics.rows_received.fetch_add(rows, Ordering::Relaxed);
512
513 let mut success = false;
514 for attempt in 1..=config.max_retries {
515 match send_batch_http(&agent, config, &batch) {
516 Ok(()) => {
517 debug!(worker_id, rows, attempt, "batch sent successfully");
518 success = true;
519 break;
520 }
521 Err(e) => {
522 stats.errors += 1;
523 metrics.errors.fetch_add(1, Ordering::Relaxed);
524
525 if attempt < config.max_retries {
526 let delay = backoff_with_jitter(
527 config.base_retry_delay,
528 config.max_retry_delay,
529 attempt,
530 &mut rng,
531 );
532 warn!(
533 worker_id,
534 attempt,
535 max_retries = config.max_retries,
536 delay_ms = delay.as_millis() as u64,
537 error = %e,
538 "batch send failed, retrying after backoff"
539 );
540 thread::sleep(delay);
541 } else {
542 warn!(
543 worker_id,
544 attempt,
545 max_retries = config.max_retries,
546 error = %e,
547 "batch send failed, no retries remaining"
548 );
549 }
550 }
551 }
552 }
553 if success {
554 stats.batches_sent += 1;
555 stats.rows_sent += rows;
556 metrics.batches_sent.fetch_add(1, Ordering::Relaxed);
557 metrics.rows_sent.fetch_add(rows, Ordering::Relaxed);
558 } else {
559 stats.batches_failed += 1;
560 stats.rows_failed += rows;
561 metrics.batches_failed.fetch_add(1, Ordering::Relaxed);
562 metrics.rows_failed.fetch_add(rows, Ordering::Relaxed);
563
564 #[cfg(feature = "spill")]
565 if let Some(dir) = config.spill_dir.as_ref() {
566 match spill_batch_to_disk(dir, worker_id, &batch) {
567 Ok(path) => {
568 stats.batches_spilled += 1;
569 stats.rows_spilled += rows;
570 metrics.batches_spilled.fetch_add(1, Ordering::Relaxed);
571 metrics.rows_spilled.fetch_add(rows, Ordering::Relaxed);
572 warn!(worker_id, rows, path = %path.display(), "batch spilled to disk after all retries exhausted");
573 }
574 Err(e) => {
575 error!(worker_id, rows, error = %e, "batch dropped: retries exhausted AND spill failed");
576 }
577 }
578 } else {
579 error!(worker_id, rows, "batch dropped after all retries exhausted");
580 }
581 #[cfg(not(feature = "spill"))]
582 error!(worker_id, rows, "batch dropped after all retries exhausted");
583 }
584 }
585
586 stats.max_outstanding_batches = metrics.max_outstanding_batches.load(Ordering::Relaxed);
587
588 info!(
589 worker_id,
590 batches_sent = stats.batches_sent,
591 rows_sent = stats.rows_sent,
592 errors = stats.errors,
593 "ClickHouse writer shut down"
594 );
595}
596
597fn backoff_with_jitter(
601 base: Duration,
602 max_delay: Duration,
603 attempt: u32,
604 rng: &mut impl Rng,
605) -> Duration {
606 let exp = base.saturating_mul(1u32 << (attempt - 1).min(30));
607 let capped = exp.min(max_delay);
608 let jitter_nanos = rng.gen_range(0..=capped.as_nanos().min(u64::MAX as u128) as u64);
609 Duration::from_nanos(jitter_nanos)
610}
611
612#[cfg(feature = "spill")]
617fn spill_batch_to_disk(
618 dir: &std::path::Path,
619 worker_id: usize,
620 batch: &RecordBatch,
621) -> Result<std::path::PathBuf, ClickHouseError> {
622 use std::fs::{self, File};
623 use std::io::BufWriter;
624 use std::sync::atomic::AtomicU64;
625 use std::time::{SystemTime, UNIX_EPOCH};
626
627 static SPILL_SEQ: AtomicU64 = AtomicU64::new(0);
628
629 fs::create_dir_all(dir)
630 .map_err(|e| ClickHouseError::Config(format!("cannot create spill dir: {e}")))?;
631
632 let nanos = SystemTime::now()
633 .duration_since(UNIX_EPOCH)
634 .map(|d| d.as_nanos())
635 .unwrap_or(0);
636 let pid = std::process::id();
637 let seq = SPILL_SEQ.fetch_add(1, Ordering::Relaxed);
638 let path = dir.join(format!("batch-{nanos}-{pid}-{worker_id}-{seq}.arrows"));
639
640 let file = File::create(&path)
641 .map_err(|e| ClickHouseError::Config(format!("cannot create spill file: {e}")))?;
642 let mut bw = BufWriter::new(file);
643 {
644 let mut writer =
645 StreamWriter::try_new(&mut bw, &batch.schema()).map_err(ClickHouseError::Arrow)?;
646 writer.write(batch).map_err(ClickHouseError::Arrow)?;
647 writer.finish().map_err(ClickHouseError::Arrow)?;
648 }
649 std::io::Write::flush(&mut bw)
650 .map_err(|e| ClickHouseError::Config(format!("spill flush failed: {e}")))?;
651 Ok(path)
652}
653
654fn send_batch_http(
656 agent: &ureq::Agent,
657 config: &ClickHouseConfig,
658 batch: &RecordBatch,
659) -> Result<(), ClickHouseError> {
660 let mut buf = Vec::new();
662 {
663 let mut writer =
664 StreamWriter::try_new(&mut buf, &batch.schema()).map_err(ClickHouseError::Arrow)?;
665 writer.write(batch).map_err(ClickHouseError::Arrow)?;
666 writer.finish().map_err(ClickHouseError::Arrow)?;
667 }
668
669 let query = format!("INSERT INTO {} FORMAT ArrowStream", config.table);
670 let url = format!("{}/?query={}", config.url, query);
671
672 let mut request = agent
673 .post(&url)
674 .set("Content-Type", "application/octet-stream");
675
676 if let Some(user) = &config.username {
677 request = request.set("X-ClickHouse-User", user);
678 }
679 if let Some(pw) = &config.password {
680 request = request.set("X-ClickHouse-Key", pw);
681 }
682 if let Some(db) = &config.database {
683 request = request.set("X-ClickHouse-Database", db);
684 }
685
686 let body = compress_if_enabled(config, buf)?;
687 if config.gzip && cfg!(feature = "gzip") {
688 request = request.set("Content-Encoding", "gzip");
689 }
690
691 let response = request.send_bytes(&body);
692
693 match response {
694 Ok(resp) => {
695 let status = resp.status();
696 if status == 200 {
697 Ok(())
698 } else {
699 let body = resp.into_string().unwrap_or_default();
700 Err(ClickHouseError::Http(format!("HTTP {}: {}", status, body)))
701 }
702 }
703 Err(e) => Err(ClickHouseError::Http(e.to_string())),
704 }
705}
706
707#[cfg(feature = "gzip")]
708fn compress_if_enabled(
709 config: &ClickHouseConfig,
710 buf: Vec<u8>,
711) -> Result<Vec<u8>, ClickHouseError> {
712 if !config.gzip {
713 return Ok(buf);
714 }
715 use flate2::write::GzEncoder;
716 use flate2::Compression;
717 use std::io::Write;
718 let mut encoder = GzEncoder::new(Vec::with_capacity(buf.len()), Compression::default());
719 encoder
720 .write_all(&buf)
721 .map_err(|e| ClickHouseError::Http(format!("gzip encode: {e}")))?;
722 encoder
723 .finish()
724 .map_err(|e| ClickHouseError::Http(format!("gzip finish: {e}")))
725}
726
727#[cfg(not(feature = "gzip"))]
728fn compress_if_enabled(
729 _config: &ClickHouseConfig,
730 buf: Vec<u8>,
731) -> Result<Vec<u8>, ClickHouseError> {
732 Ok(buf)
733}