Skip to main content

rustsim_io/
clickhouse.rs

1//! ClickHouse writer with bounded-channel backpressure.
2//!
3//! [`ClickHouseWriter`] runs a background thread that receives Arrow
4//! [`RecordBatch`] values over a bounded
5//! `sync_channel`, serializes them to Arrow IPC stream format, and POSTs
6//! them to ClickHouse via HTTP.
7//!
8//! # Backpressure
9//!
10//! The bounded channel blocks the sender when the buffer is full, preventing
11//! the simulation from outrunning the writer.
12//!
13//! # Delivery semantics
14//!
15//! Delivery is tracked in three stages:
16//! - accepted into the writer channel
17//! - successfully sent to ClickHouse
18//! - dropped after all retries are exhausted
19//!
20//! The writer exposes both live snapshots and final shutdown statistics so
21//! callers can distinguish enqueue success from durable delivery success.
22//!
23//! # Retry
24//!
25//! Failed HTTP requests are retried with exponential backoff and full jitter:
26//!
27//! ```text
28//! delay = random(0, min(max_retry_delay, base_retry_delay x 2^(attempt-1)))
29//! ```
30
31use arrow_array::RecordBatch;
32use arrow_ipc::writer::StreamWriter;
33use rand::Rng;
34use std::sync::atomic::{AtomicU64, Ordering};
35use std::sync::{mpsc, Arc, Mutex};
36use std::thread::{self, JoinHandle};
37use std::time::Duration;
38use thiserror::Error;
39use tracing::{debug, error, info, warn};
40
41/// Errors that can occur in the ClickHouse writer.
42#[derive(Debug, Error)]
43pub enum ClickHouseError {
44    /// Invalid writer configuration.
45    #[error("invalid writer configuration: {0}")]
46    Config(String),
47    /// The writer channel was closed unexpectedly.
48    #[error("writer channel closed: {0}")]
49    ChannelClosed(String),
50    /// An HTTP request to ClickHouse failed.
51    #[error("HTTP request failed: {0}")]
52    Http(String),
53    /// Arrow serialization failed.
54    #[error("Arrow serialization failed: {0}")]
55    Arrow(#[from] arrow_schema::ArrowError),
56}
57
58/// Configuration for the ClickHouse writer.
59#[derive(Debug, Clone)]
60pub struct ClickHouseConfig {
61    /// ClickHouse HTTP endpoint (e.g. `http://localhost:8123` or `https://...`).
62    ///
63    /// HTTPS requires the `tls` feature (enabled by default).
64    pub url: String,
65    /// Target table name for `INSERT INTO ... FORMAT ArrowStream`.
66    pub table: String,
67    /// Optional target database. When set, sent via `X-ClickHouse-Database`.
68    pub database: Option<String>,
69    /// Optional ClickHouse username. When set, sent via `X-ClickHouse-User`.
70    pub username: Option<String>,
71    /// Optional ClickHouse password/key. When set, sent via `X-ClickHouse-Key`.
72    pub password: Option<String>,
73    /// Advisory target batch size in rows.
74    ///
75    /// This field is an **advisory hint** for upstream batching code (such as
76    /// [`CollectArrowBridge`](crate::bridge::CollectArrowBridge) or
77    /// [`TelemetryPipeline`](https://docs.rs/rustsim)) that decide when to
78    /// flush a partially-filled Arrow batch toward the writer. The writer
79    /// itself does **not** split, merge, or reject batches based on this
80    /// value — whatever `RecordBatch` is handed to
81    /// [`ClickHouseWriter::send`] is serialized and sent as-is.
82    ///
83    /// Keep this in sync with the `batch_size` you pass to
84    /// `CollectArrowBridge::new`, or read it back from the config to drive
85    /// that call.
86    pub max_batch_rows: usize,
87    /// Maximum number of retry attempts per batch.
88    pub max_retries: u32,
89    /// Base delay for exponential backoff (default 100 ms).
90    ///
91    /// Retry N sleeps for `base_retry_delay x 2^(N-1)` plus jitter.
92    pub base_retry_delay: Duration,
93    /// Maximum backoff cap (default 10 s).
94    ///
95    /// The computed delay is clamped to this value before jitter is applied.
96    pub max_retry_delay: Duration,
97    /// TCP connect timeout (default 10 s).
98    pub connect_timeout: Duration,
99    /// HTTP read timeout per request (default 60 s).
100    pub read_timeout: Duration,
101    /// If true and the `gzip` feature is enabled, compress request bodies.
102    ///
103    /// Has no effect when the `gzip` feature is disabled.
104    pub gzip: bool,
105    /// Number of background writer threads draining the channel in parallel.
106    ///
107    /// Defaults to `1` (single-threaded writer preserving submission order).
108    /// Values greater than `1` allow multiple concurrent HTTP requests to
109    /// ClickHouse, which is useful when per-batch request latency is the
110    /// bottleneck. Ordering between batches is **not** preserved with
111    /// `workers > 1`; if downstream consumers rely on the submission order,
112    /// keep this at `1`.
113    pub workers: usize,
114    /// Optional directory where batches that fail all retries are persisted
115    /// to disk as Arrow IPC files.
116    ///
117    /// Requires the `spill` feature. When set, a batch that exhausts every
118    /// retry is written to
119    /// `spill_dir/batch-{unix_nanos}-{pid}-{seq}.arrows` instead of being
120    /// silently dropped. Replay from disk is not automatic and is left to
121    /// the operator (e.g. a separate ingest job).
122    ///
123    /// When the feature is disabled this field is ignored.
124    pub spill_dir: Option<std::path::PathBuf>,
125}
126
127impl Default for ClickHouseConfig {
128    fn default() -> Self {
129        Self {
130            url: "http://localhost:8123".to_string(),
131            table: "agent_data".to_string(),
132            database: None,
133            username: None,
134            password: None,
135            max_batch_rows: 100_000,
136            max_retries: 3,
137            base_retry_delay: Duration::from_millis(100),
138            max_retry_delay: Duration::from_secs(10),
139            connect_timeout: Duration::from_secs(10),
140            read_timeout: Duration::from_secs(60),
141            gzip: false,
142            workers: 1,
143            spill_dir: None,
144        }
145    }
146}
147
148impl ClickHouseConfig {
149    /// Create a config with the given endpoint URL and table name.
150    ///
151    /// All other settings use sensible defaults and can be overridden with the
152    /// builder-style methods below.
153    pub fn new(url: impl Into<String>, table: impl Into<String>) -> Self {
154        Self {
155            url: url.into(),
156            table: table.into(),
157            ..Self::default()
158        }
159    }
160
161    /// Override the advisory batch-row target (see
162    /// [`max_batch_rows`](Self::max_batch_rows)).
163    pub fn with_max_batch_rows(mut self, max_batch_rows: usize) -> Self {
164        self.max_batch_rows = max_batch_rows;
165        self
166    }
167
168    /// Override the maximum retry count.
169    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
170        self.max_retries = max_retries;
171        self
172    }
173
174    /// Override the base retry delay.
175    pub fn with_base_retry_delay(mut self, base_retry_delay: Duration) -> Self {
176        self.base_retry_delay = base_retry_delay;
177        self
178    }
179
180    /// Override the maximum retry delay.
181    pub fn with_max_retry_delay(mut self, max_retry_delay: Duration) -> Self {
182        self.max_retry_delay = max_retry_delay;
183        self
184    }
185
186    /// Set ClickHouse authentication credentials (sent via
187    /// `X-ClickHouse-User` / `X-ClickHouse-Key` headers).
188    pub fn with_auth(mut self, username: impl Into<String>, password: impl Into<String>) -> Self {
189        self.username = Some(username.into());
190        self.password = Some(password.into());
191        self
192    }
193
194    /// Set the target database (sent via `X-ClickHouse-Database`).
195    pub fn with_database(mut self, database: impl Into<String>) -> Self {
196        self.database = Some(database.into());
197        self
198    }
199
200    /// Set TCP connect and HTTP read timeouts.
201    pub fn with_timeouts(mut self, connect: Duration, read: Duration) -> Self {
202        self.connect_timeout = connect;
203        self.read_timeout = read;
204        self
205    }
206
207    /// Enable or disable gzip request-body compression.
208    ///
209    /// Requires the `gzip` Cargo feature to have an effect.
210    pub fn with_gzip(mut self, enabled: bool) -> Self {
211        self.gzip = enabled;
212        self
213    }
214
215    /// Override the number of concurrent background writer threads.
216    ///
217    /// See [`workers`](Self::workers) for caveats about ordering.
218    /// Values less than 1 are clamped to 1.
219    pub fn with_workers(mut self, workers: usize) -> Self {
220        self.workers = workers.max(1);
221        self
222    }
223
224    /// Enable on-disk spill for undelivered batches.
225    ///
226    /// Requires the `spill` Cargo feature to have an effect. The directory
227    /// is created on first spill if it does not already exist.
228    pub fn with_spill_dir(mut self, dir: impl Into<std::path::PathBuf>) -> Self {
229        self.spill_dir = Some(dir.into());
230        self
231    }
232}
233
234/// Message type for the writer channel.
235pub enum WriterMessage {
236    /// A batch to write to ClickHouse.
237    Batch(RecordBatch),
238    /// Signal the writer to shut down after draining remaining batches.
239    Shutdown,
240}
241
242/// Background ClickHouse writer with bounded-channel backpressure.
243///
244/// Created via [`ClickHouseWriter::new`]. Batches are sent to the background
245/// thread via [`send`](Self::send). Call [`shutdown`](Self::shutdown) to drain
246/// the channel and wait for completion.
247pub struct ClickHouseWriter {
248    sender: mpsc::SyncSender<WriterMessage>,
249    handles: Vec<JoinHandle<WriterStats>>,
250    metrics: Arc<WriterMetricsInner>,
251    workers: usize,
252}
253
254/// Statistics from the writer's lifetime.
255#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
256pub struct WriterStats {
257    /// Number of batches accepted by the background writer loop.
258    pub batches_received: u64,
259    /// Total number of rows accepted by the background writer loop.
260    pub rows_received: u64,
261    /// Number of batches successfully sent.
262    pub batches_sent: u64,
263    /// Total number of rows across all successful batches.
264    pub rows_sent: u64,
265    /// Total number of failed send attempts (including retries).
266    pub errors: u64,
267    /// Number of batches dropped after all retries were exhausted.
268    pub batches_failed: u64,
269    /// Total number of rows dropped after all retries were exhausted.
270    pub rows_failed: u64,
271    /// Number of batches that were persisted to the spill directory after all
272    /// retries were exhausted. Always `0` when the `spill` feature or
273    /// [`ClickHouseConfig::spill_dir`] is not configured. Spilled batches are
274    /// counted in `batches_failed`/`rows_failed` as well.
275    pub batches_spilled: u64,
276    /// Total number of rows persisted to the spill directory.
277    pub rows_spilled: u64,
278    /// Maximum observed outstanding-batch count submitted to the writer.
279    pub max_outstanding_batches: u64,
280}
281
282/// Live snapshot of background-writer metrics.
283#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
284pub struct WriterMetricsSnapshot {
285    /// Number of batches currently submitted to the writer but not yet dequeued.
286    pub outstanding_batches: u64,
287    /// Number of batches accepted by the background writer loop.
288    pub batches_received: u64,
289    /// Total number of rows accepted by the background writer loop.
290    pub rows_received: u64,
291    /// Number of batches successfully sent.
292    pub batches_sent: u64,
293    /// Total number of rows across all successful batches.
294    pub rows_sent: u64,
295    /// Total number of failed send attempts (including retries).
296    pub errors: u64,
297    /// Number of batches dropped after all retries were exhausted.
298    pub batches_failed: u64,
299    /// Total number of rows dropped after all retries were exhausted.
300    pub rows_failed: u64,
301    /// Number of batches written to the spill directory.
302    pub batches_spilled: u64,
303    /// Total number of rows written to the spill directory.
304    pub rows_spilled: u64,
305    /// Maximum observed outstanding-batch count submitted to the writer.
306    pub max_outstanding_batches: u64,
307}
308
309#[derive(Default)]
310struct WriterMetricsInner {
311    outstanding_batches: AtomicU64,
312    batches_received: AtomicU64,
313    rows_received: AtomicU64,
314    batches_sent: AtomicU64,
315    rows_sent: AtomicU64,
316    errors: AtomicU64,
317    batches_failed: AtomicU64,
318    rows_failed: AtomicU64,
319    batches_spilled: AtomicU64,
320    rows_spilled: AtomicU64,
321    max_outstanding_batches: AtomicU64,
322}
323
324impl WriterMetricsInner {
325    fn snapshot(&self) -> WriterMetricsSnapshot {
326        WriterMetricsSnapshot {
327            outstanding_batches: self.outstanding_batches.load(Ordering::Relaxed),
328            batches_received: self.batches_received.load(Ordering::Relaxed),
329            rows_received: self.rows_received.load(Ordering::Relaxed),
330            batches_sent: self.batches_sent.load(Ordering::Relaxed),
331            rows_sent: self.rows_sent.load(Ordering::Relaxed),
332            errors: self.errors.load(Ordering::Relaxed),
333            batches_failed: self.batches_failed.load(Ordering::Relaxed),
334            rows_failed: self.rows_failed.load(Ordering::Relaxed),
335            batches_spilled: self.batches_spilled.load(Ordering::Relaxed),
336            rows_spilled: self.rows_spilled.load(Ordering::Relaxed),
337            max_outstanding_batches: self.max_outstanding_batches.load(Ordering::Relaxed),
338        }
339    }
340
341    fn update_max_outstanding(&self, value: u64) {
342        let mut current = self.max_outstanding_batches.load(Ordering::Relaxed);
343        while value > current {
344            match self.max_outstanding_batches.compare_exchange_weak(
345                current,
346                value,
347                Ordering::Relaxed,
348                Ordering::Relaxed,
349            ) {
350                Ok(_) => break,
351                Err(observed) => current = observed,
352            }
353        }
354    }
355}
356
357impl ClickHouseWriter {
358    /// Create a new writer with bounded channel of `capacity` batches for backpressure.
359    ///
360    /// Spawns a background thread that drains the channel and sends batches
361    /// to ClickHouse.
362    pub fn new(config: ClickHouseConfig, capacity: usize) -> Result<Self, ClickHouseError> {
363        if capacity == 0 {
364            return Err(ClickHouseError::Config(
365                "channel capacity must be positive".to_string(),
366            ));
367        }
368        if config.max_retries == 0 {
369            return Err(ClickHouseError::Config(
370                "max_retries must be positive".to_string(),
371            ));
372        }
373        if config.url.trim().is_empty() {
374            return Err(ClickHouseError::Config("url must not be empty".to_string()));
375        }
376        if config.table.trim().is_empty() {
377            return Err(ClickHouseError::Config(
378                "table must not be empty".to_string(),
379            ));
380        }
381
382        let (sender, receiver) = mpsc::sync_channel::<WriterMessage>(capacity);
383        let metrics = Arc::new(WriterMetricsInner::default());
384        let workers = config.workers.max(1);
385        let receiver = Arc::new(Mutex::new(receiver));
386
387        let mut handles = Vec::with_capacity(workers);
388        for worker_id in 0..workers {
389            let metrics_for_thread = Arc::clone(&metrics);
390            let receiver_for_thread = Arc::clone(&receiver);
391            let config_for_thread = config.clone();
392            let handle = thread::spawn(move || {
393                let mut stats = WriterStats::default();
394                writer_loop(
395                    worker_id,
396                    &config_for_thread,
397                    &receiver_for_thread,
398                    &mut stats,
399                    &metrics_for_thread,
400                );
401                stats
402            });
403            handles.push(handle);
404        }
405
406        Ok(Self {
407            sender,
408            handles,
409            metrics,
410            workers,
411        })
412    }
413
414    /// Send a batch to the background writer.
415    ///
416    /// Blocks if the channel is full (backpressure).
417    pub fn send(&self, batch: RecordBatch) -> Result<(), ClickHouseError> {
418        let outstanding = self
419            .metrics
420            .outstanding_batches
421            .fetch_add(1, Ordering::Relaxed)
422            + 1;
423        self.metrics.update_max_outstanding(outstanding);
424
425        if let Err(e) = self.sender.send(WriterMessage::Batch(batch)) {
426            self.metrics
427                .outstanding_batches
428                .fetch_sub(1, Ordering::Relaxed);
429            return Err(ClickHouseError::ChannelClosed(e.to_string()));
430        }
431
432        Ok(())
433    }
434
435    /// Snapshot current writer metrics without shutting the writer down.
436    pub fn metrics(&self) -> WriterMetricsSnapshot {
437        self.metrics.snapshot()
438    }
439
440    /// Shutdown the writer, drain remaining batches, and return statistics.
441    pub fn shutdown(mut self) -> WriterStats {
442        // Send one Shutdown per worker so every worker thread unblocks.
443        for _ in 0..self.workers {
444            let _ = self.sender.send(WriterMessage::Shutdown);
445        }
446        let mut merged = WriterStats::default();
447        for h in self.handles.drain(..) {
448            if let Ok(stats) = h.join() {
449                merged.batches_received += stats.batches_received;
450                merged.rows_received += stats.rows_received;
451                merged.batches_sent += stats.batches_sent;
452                merged.rows_sent += stats.rows_sent;
453                merged.errors += stats.errors;
454                merged.batches_failed += stats.batches_failed;
455                merged.rows_failed += stats.rows_failed;
456                merged.batches_spilled += stats.batches_spilled;
457                merged.rows_spilled += stats.rows_spilled;
458                merged.max_outstanding_batches = merged
459                    .max_outstanding_batches
460                    .max(stats.max_outstanding_batches);
461            }
462        }
463        merged
464    }
465}
466
467fn writer_loop(
468    worker_id: usize,
469    config: &ClickHouseConfig,
470    receiver: &Mutex<mpsc::Receiver<WriterMessage>>,
471    stats: &mut WriterStats,
472    metrics: &WriterMetricsInner,
473) {
474    info!(
475        worker_id,
476        workers = config.workers,
477        table = %config.table,
478        url = %config.url,
479        database = ?config.database,
480        auth = config.username.is_some(),
481        gzip = config.gzip && cfg!(feature = "gzip"),
482        "ClickHouse writer started"
483    );
484
485    let agent = ureq::AgentBuilder::new()
486        .timeout_connect(config.connect_timeout)
487        .timeout_read(config.read_timeout)
488        .build();
489
490    let mut rng = rand::thread_rng();
491
492    loop {
493        // Briefly lock the shared receiver to dequeue a single message.
494        // HTTP send and retry backoff happen outside the lock so multiple
495        // workers can fan out concurrently.
496        let msg = match receiver.lock() {
497            Ok(guard) => guard.recv(),
498            Err(poisoned) => poisoned.into_inner().recv(),
499        };
500        let batch = match msg {
501            Ok(WriterMessage::Batch(b)) => b,
502            Ok(WriterMessage::Shutdown) => break,
503            Err(_) => break,
504        };
505
506        metrics.outstanding_batches.fetch_sub(1, Ordering::Relaxed);
507        let rows = batch.num_rows() as u64;
508        stats.batches_received += 1;
509        stats.rows_received += rows;
510        metrics.batches_received.fetch_add(1, Ordering::Relaxed);
511        metrics.rows_received.fetch_add(rows, Ordering::Relaxed);
512
513        let mut success = false;
514        for attempt in 1..=config.max_retries {
515            match send_batch_http(&agent, config, &batch) {
516                Ok(()) => {
517                    debug!(worker_id, rows, attempt, "batch sent successfully");
518                    success = true;
519                    break;
520                }
521                Err(e) => {
522                    stats.errors += 1;
523                    metrics.errors.fetch_add(1, Ordering::Relaxed);
524
525                    if attempt < config.max_retries {
526                        let delay = backoff_with_jitter(
527                            config.base_retry_delay,
528                            config.max_retry_delay,
529                            attempt,
530                            &mut rng,
531                        );
532                        warn!(
533                            worker_id,
534                            attempt,
535                            max_retries = config.max_retries,
536                            delay_ms = delay.as_millis() as u64,
537                            error = %e,
538                            "batch send failed, retrying after backoff"
539                        );
540                        thread::sleep(delay);
541                    } else {
542                        warn!(
543                            worker_id,
544                            attempt,
545                            max_retries = config.max_retries,
546                            error = %e,
547                            "batch send failed, no retries remaining"
548                        );
549                    }
550                }
551            }
552        }
553        if success {
554            stats.batches_sent += 1;
555            stats.rows_sent += rows;
556            metrics.batches_sent.fetch_add(1, Ordering::Relaxed);
557            metrics.rows_sent.fetch_add(rows, Ordering::Relaxed);
558        } else {
559            stats.batches_failed += 1;
560            stats.rows_failed += rows;
561            metrics.batches_failed.fetch_add(1, Ordering::Relaxed);
562            metrics.rows_failed.fetch_add(rows, Ordering::Relaxed);
563
564            #[cfg(feature = "spill")]
565            if let Some(dir) = config.spill_dir.as_ref() {
566                match spill_batch_to_disk(dir, worker_id, &batch) {
567                    Ok(path) => {
568                        stats.batches_spilled += 1;
569                        stats.rows_spilled += rows;
570                        metrics.batches_spilled.fetch_add(1, Ordering::Relaxed);
571                        metrics.rows_spilled.fetch_add(rows, Ordering::Relaxed);
572                        warn!(worker_id, rows, path = %path.display(), "batch spilled to disk after all retries exhausted");
573                    }
574                    Err(e) => {
575                        error!(worker_id, rows, error = %e, "batch dropped: retries exhausted AND spill failed");
576                    }
577                }
578            } else {
579                error!(worker_id, rows, "batch dropped after all retries exhausted");
580            }
581            #[cfg(not(feature = "spill"))]
582            error!(worker_id, rows, "batch dropped after all retries exhausted");
583        }
584    }
585
586    stats.max_outstanding_batches = metrics.max_outstanding_batches.load(Ordering::Relaxed);
587
588    info!(
589        worker_id,
590        batches_sent = stats.batches_sent,
591        rows_sent = stats.rows_sent,
592        errors = stats.errors,
593        "ClickHouse writer shut down"
594    );
595}
596
597/// Compute exponential backoff with full jitter.
598///
599/// delay = random(0, min(max_delay, base * 2^(attempt-1)))
600fn backoff_with_jitter(
601    base: Duration,
602    max_delay: Duration,
603    attempt: u32,
604    rng: &mut impl Rng,
605) -> Duration {
606    let exp = base.saturating_mul(1u32 << (attempt - 1).min(30));
607    let capped = exp.min(max_delay);
608    let jitter_nanos = rng.gen_range(0..=capped.as_nanos().min(u64::MAX as u128) as u64);
609    Duration::from_nanos(jitter_nanos)
610}
611
612/// Persist a RecordBatch to disk as an Arrow IPC stream file.
613///
614/// Filenames are globally unique per process: `batch-{nanos}-{pid}-{worker}-{seq}.arrows`.
615/// Creates `dir` if it does not exist. Returns the full path on success.
616#[cfg(feature = "spill")]
617fn spill_batch_to_disk(
618    dir: &std::path::Path,
619    worker_id: usize,
620    batch: &RecordBatch,
621) -> Result<std::path::PathBuf, ClickHouseError> {
622    use std::fs::{self, File};
623    use std::io::BufWriter;
624    use std::sync::atomic::AtomicU64;
625    use std::time::{SystemTime, UNIX_EPOCH};
626
627    static SPILL_SEQ: AtomicU64 = AtomicU64::new(0);
628
629    fs::create_dir_all(dir)
630        .map_err(|e| ClickHouseError::Config(format!("cannot create spill dir: {e}")))?;
631
632    let nanos = SystemTime::now()
633        .duration_since(UNIX_EPOCH)
634        .map(|d| d.as_nanos())
635        .unwrap_or(0);
636    let pid = std::process::id();
637    let seq = SPILL_SEQ.fetch_add(1, Ordering::Relaxed);
638    let path = dir.join(format!("batch-{nanos}-{pid}-{worker_id}-{seq}.arrows"));
639
640    let file = File::create(&path)
641        .map_err(|e| ClickHouseError::Config(format!("cannot create spill file: {e}")))?;
642    let mut bw = BufWriter::new(file);
643    {
644        let mut writer =
645            StreamWriter::try_new(&mut bw, &batch.schema()).map_err(ClickHouseError::Arrow)?;
646        writer.write(batch).map_err(ClickHouseError::Arrow)?;
647        writer.finish().map_err(ClickHouseError::Arrow)?;
648    }
649    std::io::Write::flush(&mut bw)
650        .map_err(|e| ClickHouseError::Config(format!("spill flush failed: {e}")))?;
651    Ok(path)
652}
653
654/// Serialize a RecordBatch to Arrow IPC stream format and POST it to ClickHouse.
655fn send_batch_http(
656    agent: &ureq::Agent,
657    config: &ClickHouseConfig,
658    batch: &RecordBatch,
659) -> Result<(), ClickHouseError> {
660    // Serialize batch to Arrow IPC stream bytes
661    let mut buf = Vec::new();
662    {
663        let mut writer =
664            StreamWriter::try_new(&mut buf, &batch.schema()).map_err(ClickHouseError::Arrow)?;
665        writer.write(batch).map_err(ClickHouseError::Arrow)?;
666        writer.finish().map_err(ClickHouseError::Arrow)?;
667    }
668
669    let query = format!("INSERT INTO {} FORMAT ArrowStream", config.table);
670    let url = format!("{}/?query={}", config.url, query);
671
672    let mut request = agent
673        .post(&url)
674        .set("Content-Type", "application/octet-stream");
675
676    if let Some(user) = &config.username {
677        request = request.set("X-ClickHouse-User", user);
678    }
679    if let Some(pw) = &config.password {
680        request = request.set("X-ClickHouse-Key", pw);
681    }
682    if let Some(db) = &config.database {
683        request = request.set("X-ClickHouse-Database", db);
684    }
685
686    let body = compress_if_enabled(config, buf)?;
687    if config.gzip && cfg!(feature = "gzip") {
688        request = request.set("Content-Encoding", "gzip");
689    }
690
691    let response = request.send_bytes(&body);
692
693    match response {
694        Ok(resp) => {
695            let status = resp.status();
696            if status == 200 {
697                Ok(())
698            } else {
699                let body = resp.into_string().unwrap_or_default();
700                Err(ClickHouseError::Http(format!("HTTP {}: {}", status, body)))
701            }
702        }
703        Err(e) => Err(ClickHouseError::Http(e.to_string())),
704    }
705}
706
707#[cfg(feature = "gzip")]
708fn compress_if_enabled(
709    config: &ClickHouseConfig,
710    buf: Vec<u8>,
711) -> Result<Vec<u8>, ClickHouseError> {
712    if !config.gzip {
713        return Ok(buf);
714    }
715    use flate2::write::GzEncoder;
716    use flate2::Compression;
717    use std::io::Write;
718    let mut encoder = GzEncoder::new(Vec::with_capacity(buf.len()), Compression::default());
719    encoder
720        .write_all(&buf)
721        .map_err(|e| ClickHouseError::Http(format!("gzip encode: {e}")))?;
722    encoder
723        .finish()
724        .map_err(|e| ClickHouseError::Http(format!("gzip finish: {e}")))
725}
726
727#[cfg(not(feature = "gzip"))]
728fn compress_if_enabled(
729    _config: &ClickHouseConfig,
730    buf: Vec<u8>,
731) -> Result<Vec<u8>, ClickHouseError> {
732    Ok(buf)
733}