use arrow_array::RecordBatch;
use arrow_ipc::writer::StreamWriter;
use rand::Rng;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{mpsc, Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::Duration;
use thiserror::Error;
use tracing::{debug, error, info, warn};
#[derive(Debug, Error)]
pub enum ClickHouseError {
#[error("invalid writer configuration: {0}")]
Config(String),
#[error("writer channel closed: {0}")]
ChannelClosed(String),
#[error("HTTP request failed: {0}")]
Http(String),
#[error("Arrow serialization failed: {0}")]
Arrow(#[from] arrow_schema::ArrowError),
}
#[derive(Debug, Clone)]
pub struct ClickHouseConfig {
pub url: String,
pub table: String,
pub database: Option<String>,
pub username: Option<String>,
pub password: Option<String>,
pub max_batch_rows: usize,
pub max_retries: u32,
pub base_retry_delay: Duration,
pub max_retry_delay: Duration,
pub connect_timeout: Duration,
pub read_timeout: Duration,
pub gzip: bool,
pub workers: usize,
pub spill_dir: Option<std::path::PathBuf>,
}
impl Default for ClickHouseConfig {
fn default() -> Self {
Self {
url: "http://localhost:8123".to_string(),
table: "agent_data".to_string(),
database: None,
username: None,
password: None,
max_batch_rows: 100_000,
max_retries: 3,
base_retry_delay: Duration::from_millis(100),
max_retry_delay: Duration::from_secs(10),
connect_timeout: Duration::from_secs(10),
read_timeout: Duration::from_secs(60),
gzip: false,
workers: 1,
spill_dir: None,
}
}
}
impl ClickHouseConfig {
pub fn new(url: impl Into<String>, table: impl Into<String>) -> Self {
Self {
url: url.into(),
table: table.into(),
..Self::default()
}
}
pub fn with_max_batch_rows(mut self, max_batch_rows: usize) -> Self {
self.max_batch_rows = max_batch_rows;
self
}
pub fn with_max_retries(mut self, max_retries: u32) -> Self {
self.max_retries = max_retries;
self
}
pub fn with_base_retry_delay(mut self, base_retry_delay: Duration) -> Self {
self.base_retry_delay = base_retry_delay;
self
}
pub fn with_max_retry_delay(mut self, max_retry_delay: Duration) -> Self {
self.max_retry_delay = max_retry_delay;
self
}
pub fn with_auth(mut self, username: impl Into<String>, password: impl Into<String>) -> Self {
self.username = Some(username.into());
self.password = Some(password.into());
self
}
pub fn with_database(mut self, database: impl Into<String>) -> Self {
self.database = Some(database.into());
self
}
pub fn with_timeouts(mut self, connect: Duration, read: Duration) -> Self {
self.connect_timeout = connect;
self.read_timeout = read;
self
}
pub fn with_gzip(mut self, enabled: bool) -> Self {
self.gzip = enabled;
self
}
pub fn with_workers(mut self, workers: usize) -> Self {
self.workers = workers.max(1);
self
}
pub fn with_spill_dir(mut self, dir: impl Into<std::path::PathBuf>) -> Self {
self.spill_dir = Some(dir.into());
self
}
}
pub enum WriterMessage {
Batch(RecordBatch),
Shutdown,
}
pub struct ClickHouseWriter {
sender: mpsc::SyncSender<WriterMessage>,
handles: Vec<JoinHandle<WriterStats>>,
metrics: Arc<WriterMetricsInner>,
workers: usize,
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub struct WriterStats {
pub batches_received: u64,
pub rows_received: u64,
pub batches_sent: u64,
pub rows_sent: u64,
pub errors: u64,
pub batches_failed: u64,
pub rows_failed: u64,
pub batches_spilled: u64,
pub rows_spilled: u64,
pub max_outstanding_batches: u64,
}
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub struct WriterMetricsSnapshot {
pub outstanding_batches: u64,
pub batches_received: u64,
pub rows_received: u64,
pub batches_sent: u64,
pub rows_sent: u64,
pub errors: u64,
pub batches_failed: u64,
pub rows_failed: u64,
pub batches_spilled: u64,
pub rows_spilled: u64,
pub max_outstanding_batches: u64,
}
#[derive(Default)]
struct WriterMetricsInner {
outstanding_batches: AtomicU64,
batches_received: AtomicU64,
rows_received: AtomicU64,
batches_sent: AtomicU64,
rows_sent: AtomicU64,
errors: AtomicU64,
batches_failed: AtomicU64,
rows_failed: AtomicU64,
batches_spilled: AtomicU64,
rows_spilled: AtomicU64,
max_outstanding_batches: AtomicU64,
}
impl WriterMetricsInner {
fn snapshot(&self) -> WriterMetricsSnapshot {
WriterMetricsSnapshot {
outstanding_batches: self.outstanding_batches.load(Ordering::Relaxed),
batches_received: self.batches_received.load(Ordering::Relaxed),
rows_received: self.rows_received.load(Ordering::Relaxed),
batches_sent: self.batches_sent.load(Ordering::Relaxed),
rows_sent: self.rows_sent.load(Ordering::Relaxed),
errors: self.errors.load(Ordering::Relaxed),
batches_failed: self.batches_failed.load(Ordering::Relaxed),
rows_failed: self.rows_failed.load(Ordering::Relaxed),
batches_spilled: self.batches_spilled.load(Ordering::Relaxed),
rows_spilled: self.rows_spilled.load(Ordering::Relaxed),
max_outstanding_batches: self.max_outstanding_batches.load(Ordering::Relaxed),
}
}
fn update_max_outstanding(&self, value: u64) {
let mut current = self.max_outstanding_batches.load(Ordering::Relaxed);
while value > current {
match self.max_outstanding_batches.compare_exchange_weak(
current,
value,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => break,
Err(observed) => current = observed,
}
}
}
}
impl ClickHouseWriter {
pub fn new(config: ClickHouseConfig, capacity: usize) -> Result<Self, ClickHouseError> {
if capacity == 0 {
return Err(ClickHouseError::Config(
"channel capacity must be positive".to_string(),
));
}
if config.max_retries == 0 {
return Err(ClickHouseError::Config(
"max_retries must be positive".to_string(),
));
}
if config.url.trim().is_empty() {
return Err(ClickHouseError::Config("url must not be empty".to_string()));
}
if config.table.trim().is_empty() {
return Err(ClickHouseError::Config(
"table must not be empty".to_string(),
));
}
let (sender, receiver) = mpsc::sync_channel::<WriterMessage>(capacity);
let metrics = Arc::new(WriterMetricsInner::default());
let workers = config.workers.max(1);
let receiver = Arc::new(Mutex::new(receiver));
let mut handles = Vec::with_capacity(workers);
for worker_id in 0..workers {
let metrics_for_thread = Arc::clone(&metrics);
let receiver_for_thread = Arc::clone(&receiver);
let config_for_thread = config.clone();
let handle = thread::spawn(move || {
let mut stats = WriterStats::default();
writer_loop(
worker_id,
&config_for_thread,
&receiver_for_thread,
&mut stats,
&metrics_for_thread,
);
stats
});
handles.push(handle);
}
Ok(Self {
sender,
handles,
metrics,
workers,
})
}
pub fn send(&self, batch: RecordBatch) -> Result<(), ClickHouseError> {
let outstanding = self
.metrics
.outstanding_batches
.fetch_add(1, Ordering::Relaxed)
+ 1;
self.metrics.update_max_outstanding(outstanding);
if let Err(e) = self.sender.send(WriterMessage::Batch(batch)) {
self.metrics
.outstanding_batches
.fetch_sub(1, Ordering::Relaxed);
return Err(ClickHouseError::ChannelClosed(e.to_string()));
}
Ok(())
}
pub fn metrics(&self) -> WriterMetricsSnapshot {
self.metrics.snapshot()
}
pub fn shutdown(mut self) -> WriterStats {
for _ in 0..self.workers {
let _ = self.sender.send(WriterMessage::Shutdown);
}
let mut merged = WriterStats::default();
for h in self.handles.drain(..) {
if let Ok(stats) = h.join() {
merged.batches_received += stats.batches_received;
merged.rows_received += stats.rows_received;
merged.batches_sent += stats.batches_sent;
merged.rows_sent += stats.rows_sent;
merged.errors += stats.errors;
merged.batches_failed += stats.batches_failed;
merged.rows_failed += stats.rows_failed;
merged.batches_spilled += stats.batches_spilled;
merged.rows_spilled += stats.rows_spilled;
merged.max_outstanding_batches = merged
.max_outstanding_batches
.max(stats.max_outstanding_batches);
}
}
merged
}
}
fn writer_loop(
worker_id: usize,
config: &ClickHouseConfig,
receiver: &Mutex<mpsc::Receiver<WriterMessage>>,
stats: &mut WriterStats,
metrics: &WriterMetricsInner,
) {
info!(
worker_id,
workers = config.workers,
table = %config.table,
url = %config.url,
database = ?config.database,
auth = config.username.is_some(),
gzip = config.gzip && cfg!(feature = "gzip"),
"ClickHouse writer started"
);
let agent = ureq::AgentBuilder::new()
.timeout_connect(config.connect_timeout)
.timeout_read(config.read_timeout)
.build();
let mut rng = rand::thread_rng();
loop {
let msg = match receiver.lock() {
Ok(guard) => guard.recv(),
Err(poisoned) => poisoned.into_inner().recv(),
};
let batch = match msg {
Ok(WriterMessage::Batch(b)) => b,
Ok(WriterMessage::Shutdown) => break,
Err(_) => break,
};
metrics.outstanding_batches.fetch_sub(1, Ordering::Relaxed);
let rows = batch.num_rows() as u64;
stats.batches_received += 1;
stats.rows_received += rows;
metrics.batches_received.fetch_add(1, Ordering::Relaxed);
metrics.rows_received.fetch_add(rows, Ordering::Relaxed);
let mut success = false;
for attempt in 1..=config.max_retries {
match send_batch_http(&agent, config, &batch) {
Ok(()) => {
debug!(worker_id, rows, attempt, "batch sent successfully");
success = true;
break;
}
Err(e) => {
stats.errors += 1;
metrics.errors.fetch_add(1, Ordering::Relaxed);
if attempt < config.max_retries {
let delay = backoff_with_jitter(
config.base_retry_delay,
config.max_retry_delay,
attempt,
&mut rng,
);
warn!(
worker_id,
attempt,
max_retries = config.max_retries,
delay_ms = delay.as_millis() as u64,
error = %e,
"batch send failed, retrying after backoff"
);
thread::sleep(delay);
} else {
warn!(
worker_id,
attempt,
max_retries = config.max_retries,
error = %e,
"batch send failed, no retries remaining"
);
}
}
}
}
if success {
stats.batches_sent += 1;
stats.rows_sent += rows;
metrics.batches_sent.fetch_add(1, Ordering::Relaxed);
metrics.rows_sent.fetch_add(rows, Ordering::Relaxed);
} else {
stats.batches_failed += 1;
stats.rows_failed += rows;
metrics.batches_failed.fetch_add(1, Ordering::Relaxed);
metrics.rows_failed.fetch_add(rows, Ordering::Relaxed);
#[cfg(feature = "spill")]
if let Some(dir) = config.spill_dir.as_ref() {
match spill_batch_to_disk(dir, worker_id, &batch) {
Ok(path) => {
stats.batches_spilled += 1;
stats.rows_spilled += rows;
metrics.batches_spilled.fetch_add(1, Ordering::Relaxed);
metrics.rows_spilled.fetch_add(rows, Ordering::Relaxed);
warn!(worker_id, rows, path = %path.display(), "batch spilled to disk after all retries exhausted");
}
Err(e) => {
error!(worker_id, rows, error = %e, "batch dropped: retries exhausted AND spill failed");
}
}
} else {
error!(worker_id, rows, "batch dropped after all retries exhausted");
}
#[cfg(not(feature = "spill"))]
error!(worker_id, rows, "batch dropped after all retries exhausted");
}
}
stats.max_outstanding_batches = metrics.max_outstanding_batches.load(Ordering::Relaxed);
info!(
worker_id,
batches_sent = stats.batches_sent,
rows_sent = stats.rows_sent,
errors = stats.errors,
"ClickHouse writer shut down"
);
}
fn backoff_with_jitter(
base: Duration,
max_delay: Duration,
attempt: u32,
rng: &mut impl Rng,
) -> Duration {
let exp = base.saturating_mul(1u32 << (attempt - 1).min(30));
let capped = exp.min(max_delay);
let jitter_nanos = rng.gen_range(0..=capped.as_nanos().min(u64::MAX as u128) as u64);
Duration::from_nanos(jitter_nanos)
}
#[cfg(feature = "spill")]
fn spill_batch_to_disk(
dir: &std::path::Path,
worker_id: usize,
batch: &RecordBatch,
) -> Result<std::path::PathBuf, ClickHouseError> {
use std::fs::{self, File};
use std::io::BufWriter;
use std::sync::atomic::AtomicU64;
use std::time::{SystemTime, UNIX_EPOCH};
static SPILL_SEQ: AtomicU64 = AtomicU64::new(0);
fs::create_dir_all(dir)
.map_err(|e| ClickHouseError::Config(format!("cannot create spill dir: {e}")))?;
let nanos = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_nanos())
.unwrap_or(0);
let pid = std::process::id();
let seq = SPILL_SEQ.fetch_add(1, Ordering::Relaxed);
let path = dir.join(format!("batch-{nanos}-{pid}-{worker_id}-{seq}.arrows"));
let file = File::create(&path)
.map_err(|e| ClickHouseError::Config(format!("cannot create spill file: {e}")))?;
let mut bw = BufWriter::new(file);
{
let mut writer =
StreamWriter::try_new(&mut bw, &batch.schema()).map_err(ClickHouseError::Arrow)?;
writer.write(batch).map_err(ClickHouseError::Arrow)?;
writer.finish().map_err(ClickHouseError::Arrow)?;
}
std::io::Write::flush(&mut bw)
.map_err(|e| ClickHouseError::Config(format!("spill flush failed: {e}")))?;
Ok(path)
}
fn send_batch_http(
agent: &ureq::Agent,
config: &ClickHouseConfig,
batch: &RecordBatch,
) -> Result<(), ClickHouseError> {
let mut buf = Vec::new();
{
let mut writer =
StreamWriter::try_new(&mut buf, &batch.schema()).map_err(ClickHouseError::Arrow)?;
writer.write(batch).map_err(ClickHouseError::Arrow)?;
writer.finish().map_err(ClickHouseError::Arrow)?;
}
let query = format!("INSERT INTO {} FORMAT ArrowStream", config.table);
let url = format!("{}/?query={}", config.url, query);
let mut request = agent
.post(&url)
.set("Content-Type", "application/octet-stream");
if let Some(user) = &config.username {
request = request.set("X-ClickHouse-User", user);
}
if let Some(pw) = &config.password {
request = request.set("X-ClickHouse-Key", pw);
}
if let Some(db) = &config.database {
request = request.set("X-ClickHouse-Database", db);
}
let body = compress_if_enabled(config, buf)?;
if config.gzip && cfg!(feature = "gzip") {
request = request.set("Content-Encoding", "gzip");
}
let response = request.send_bytes(&body);
match response {
Ok(resp) => {
let status = resp.status();
if status == 200 {
Ok(())
} else {
let body = resp.into_string().unwrap_or_default();
Err(ClickHouseError::Http(format!("HTTP {}: {}", status, body)))
}
}
Err(e) => Err(ClickHouseError::Http(e.to_string())),
}
}
#[cfg(feature = "gzip")]
fn compress_if_enabled(
config: &ClickHouseConfig,
buf: Vec<u8>,
) -> Result<Vec<u8>, ClickHouseError> {
if !config.gzip {
return Ok(buf);
}
use flate2::write::GzEncoder;
use flate2::Compression;
use std::io::Write;
let mut encoder = GzEncoder::new(Vec::with_capacity(buf.len()), Compression::default());
encoder
.write_all(&buf)
.map_err(|e| ClickHouseError::Http(format!("gzip encode: {e}")))?;
encoder
.finish()
.map_err(|e| ClickHouseError::Http(format!("gzip finish: {e}")))
}
#[cfg(not(feature = "gzip"))]
fn compress_if_enabled(
_config: &ClickHouseConfig,
buf: Vec<u8>,
) -> Result<Vec<u8>, ClickHouseError> {
Ok(buf)
}