clicktype-batch 0.2.0

Async batching system for ClickType with backpressure and metrics
Documentation
//! Batcher implementation for high-throughput ingestion.
//!
//! The batcher accumulates rows in memory and flushes them to ClickHouse when:
//! 1. The number of rows reaches `max_rows`
//! 2. The buffer size reaches `max_buffer_size`
//! 3. The time since the first row insertion exceeds `max_wait`
//!
//! # Architecture
//!
//! The batcher runs as a background worker task spawned via `tokio::spawn`.
//! It communicates with the application via a multi-producer single-consumer (MPSC) channel.
//!
//! # Memory Management
//!
//! The batcher uses a single reusable buffer to minimize allocations.
//! After each flush, if the buffer capacity exceeds `buffer_shrink_threshold`,
//! it is reallocated to `initial_buffer_size` to release memory back to the OS.

use std::marker::PhantomData;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tracing::{debug, error, info, instrument, warn};
use clicktype_core::traits::{ClickInsertable, ClickTable};
use clicktype_transport::{Client, ClickTypeTransport};
use crate::config::{BatchConfig, FlushStats};
use crate::buffer::BatchBuffer;
use crate::error::{BatchError, Result};

/// Message types for the batcher worker
enum BatchMessage<T> {
    Insert(T),
    InsertMany(Vec<T>),
    Flush(oneshot::Sender<Result<FlushStats>>),
    Close(oneshot::Sender<Result<()>>),
}

/// A handle to the background batcher worker.
///
/// This handle is cheap to clone and can be shared across threads.
/// It provides methods to insert rows and control the batcher lifecycle.
#[derive(Clone)]
pub struct BatcherHandle<T> {
    sender: mpsc::Sender<BatchMessage<T>>,
}

impl<T: ClickInsertable + Send + 'static> BatcherHandle<T> {
    /// Insert a single row into the batch.
    ///
    /// This method waits if the internal channel is full (Backpressure).
    /// Use this for data integrity when you want to slow down the producer
    /// rather than dropping data.
    ///
    /// # Errors
    /// Returns an error if the batcher worker has terminated.
    pub async fn insert(&self, row: T) -> Result<()> {
        self.sender
            .send(BatchMessage::Insert(row))
            .await
            .map_err(|_| BatchError::ChannelClosed)
    }

    /// Try to insert a single row without blocking.
    ///
    /// This method fails immediately if the channel is full (Load Shedding).
    /// Use this for high-availability scenarios where dropping data is preferable
    /// to blocking the application.
    ///
    /// # Errors
    /// Returns `Channel full` if the channel is at capacity.
    /// Returns `Channel closed` if the worker has terminated.
    pub fn try_insert(&self, row: T) -> Result<()> {
        self.sender
            .try_send(BatchMessage::Insert(row))
            .map_err(|e| match e {
                mpsc::error::TrySendError::Full(_) => BatchError::ChannelFull,
                mpsc::error::TrySendError::Closed(_) => BatchError::ChannelClosed,
            })
    }

    /// Insert multiple rows into the batch.
    ///
    /// This treats the vector as a single unit for channel capacity purposes.
    pub async fn insert_many(&self, rows: Vec<T>) -> Result<()> {
        if rows.is_empty() {
            return Ok(());
        }
        self.sender
            .send(BatchMessage::InsertMany(rows))
            .await
            .map_err(|_| BatchError::ChannelClosed)
    }

    /// Force a flush of the current buffer and wait for completion.
    ///
    /// This ensures all data currently in the buffer is sent to ClickHouse.
    pub async fn flush(&self) -> Result<FlushStats> {
        let (tx, rx) = oneshot::channel();
        self.sender
            .send(BatchMessage::Flush(tx))
            .await
            .map_err(|_| BatchError::ChannelClosed)?;
        rx.await.map_err(|_| BatchError::ChannelClosed)?
    }

    /// Close the batcher and flush remaining data.
    ///
    /// This signals the worker to finish processing pending items, flush the buffer,
    /// and then terminate.
    pub async fn close(self) -> Result<()> {
        let (tx, rx) = oneshot::channel();
        self.sender
            .send(BatchMessage::Close(tx))
            .await
            .map_err(|_| BatchError::ChannelClosed)?;
        rx.await.map_err(|_| BatchError::ChannelClosed)?
    }
}

/// The main entry point for batch ingestion.
///
/// `GenericBatcher` is responsible for configuring and spawning the background worker.
/// It is generic over the transport implementation `C`.
pub struct GenericBatcher<T: ClickInsertable, C: ClickTypeTransport> {
    config: BatchConfig,
    client: C,
    _phantom: PhantomData<T>,
}

/// Default Batcher alias using the standard Client
pub type Batcher<T> = GenericBatcher<T, Client>;

impl<T, C> GenericBatcher<T, C>
where
    T: ClickInsertable + ClickTable + Send + Sync + 'static,
    C: ClickTypeTransport + Send + Sync + 'static,
{
    /// Create a new batcher instance.
    ///
    /// # Arguments
    /// * `client` - Configured ClickType client (or mock)
    /// * `config` - Batching configuration
    pub fn new(client: C, config: BatchConfig) -> Self {
        Self {
            config,
            client,
            _phantom: PhantomData,
        }
    }

    /// Spawn the batcher worker task.
    ///
    /// Returns a tuple containing:
    /// 1. `BatcherHandle`: Used to insert data.
    /// 2. `JoinHandle`: Used to supervise the worker task (detect panics or wait for completion).
    ///
    /// # Example
    /// ```rust,ignore
    /// let (handle, worker) = batcher.spawn();
    /// handle.insert(data).await?;
    /// handle.close().await?;
    /// worker.await?; // Ensure clean exit
    /// ```
    pub fn spawn(self) -> (BatcherHandle<T>, JoinHandle<()>) {
        let (tx, rx) = mpsc::channel(self.config.channel_capacity);
        let handle = BatcherHandle { sender: tx };

        let join_handle = tokio::spawn(self.worker_loop(rx));

        (handle, join_handle)
    }

    /// Worker loop that processes inserts and flushes
    #[instrument(skip(self, rx), fields(table = T::table_name()), name = "batcher_worker")]
    async fn worker_loop(self, mut rx: mpsc::Receiver<BatchMessage<T>>) {
        debug!("Batcher worker started");

        // Wrap logic to ensure we log exit
        let _result = async {
            let mut buffer = BatchBuffer::new(self.config.initial_buffer_size);
            let mut row_count = 0usize;
            let mut batch_start: Option<Instant> = None;
            let mut flush_interval = tokio::time::interval(self.config.max_wait);
            let mut schema_validated = false;

            loop {
                tokio::select! {
                    msg = rx.recv() => {
                        match msg {
                            Some(BatchMessage::Insert(row)) => {
                                // Validate schema on first insert
                                if !schema_validated {
                                    // Use the schema from T (ClickTable)
                                    let schema = T::schema();
                                    match self.client.validate_schema(T::table_name(), &schema).await {
                                        Ok(()) => {
                                            info!("Schema validation passed for table {}", T::table_name());
                                            schema_validated = true;
                                        }
                                        Err(e) => {
                                            error!(error = %e, "CRITICAL: Schema validation failed, dropping insert");
                                            continue;
                                        }
                                    }
                                }

                                if batch_start.is_none() {
                                    batch_start = Some(Instant::now());
                                }

                                if let Err(e) = row.write_row(&mut buffer) {
                                    error!(error = %e, "Failed to serialize row");
                                    continue;
                                }

                                row_count += 1;

                                if row_count >= self.config.max_rows
                                    || buffer.len() >= self.config.max_buffer_size
                                {
                                    let _ = self.flush_buffer(&mut buffer, &mut row_count, &mut batch_start).await;
                                }
                            }

                            Some(BatchMessage::InsertMany(rows)) => {
                                // Validate schema on first insert
                                if !schema_validated {
                                    let schema = T::schema();
                                    match self.client.validate_schema(T::table_name(), &schema).await {
                                        Ok(()) => {
                                            info!("Schema validation passed for table {}", T::table_name());
                                            schema_validated = true;
                                        }
                                        Err(e) => {
                                            error!(error = %e, "CRITICAL: Schema validation failed, dropping batch");
                                            continue;
                                        }
                                    }
                                }

                                if batch_start.is_none() {
                                    batch_start = Some(Instant::now());
                                }

                                for row in rows {
                                    if let Err(e) = row.write_row(&mut buffer) {
                                        error!(error = %e, "Failed to serialize row in batch");
                                        continue;
                                    }
                                    row_count += 1;
                                }

                                if row_count >= self.config.max_rows {
                                    let _ = self.flush_buffer(&mut buffer, &mut row_count, &mut batch_start).await;
                                }
                            }

                            Some(BatchMessage::Flush(respond)) => {
                                let result = self.flush_buffer(&mut buffer, &mut row_count, &mut batch_start).await;
                                let _ = respond.send(result);
                            }

                            Some(BatchMessage::Close(respond)) => {
                                let result = self.flush_buffer(&mut buffer, &mut row_count, &mut batch_start).await;
                                let _ = respond.send(result.map(|_| ()));
                                return; // Exit loop
                            }

                            None => {
                                // Channel closed
                                let _ = self.flush_buffer(&mut buffer, &mut row_count, &mut batch_start).await;
                                return; // Exit loop
                            }
                        }
                    }

                    _ = flush_interval.tick() => {
                        if row_count > 0 {
                            let _ = self.flush_buffer(&mut buffer, &mut row_count, &mut batch_start).await;
                        }
                    }
                }
            }
        }.await;

        info!("Batcher worker stopped");
    }

    /// Flush the buffer to ClickHouse
    #[instrument(skip(self, buffer, row_count, batch_start), fields(rows = *row_count, bytes = buffer.len()))]
    async fn flush_buffer(
        &self,
        buffer: &mut BatchBuffer,
        row_count: &mut usize,
        batch_start: &mut Option<Instant>,
    ) -> Result<FlushStats> {
        if *row_count == 0 {
            return Ok(FlushStats {
                rows_flushed: 0,
                bytes_sent: 0,
                duration: Duration::ZERO,
                batch_age: Duration::ZERO,
            });
        }

        let flush_start = Instant::now();
        let batch_age = batch_start.map(|s| s.elapsed()).unwrap_or(Duration::ZERO);
        let bytes_to_send = buffer.len();
        let rows_to_flush = *row_count;
        let table_name = T::table_name();

        debug!(rows = rows_to_flush, bytes = bytes_to_send, table = %table_name, "Starting flush to ClickHouse");

        // Try insert with retries
        let mut last_error = None;
        for attempt in 0..=self.config.max_retries {
            if attempt > 0 {
                warn!(attempt, max_retries = self.config.max_retries, "Retrying flush...");
            }

            match self.client.insert_binary(table_name, buffer.as_slice()).await {
                Ok(()) => {
                    let duration = flush_start.elapsed();
                    let stats = FlushStats {
                        rows_flushed: rows_to_flush,
                        bytes_sent: bytes_to_send,
                        duration,
                        batch_age,
                    };

                    info!(
                        rows = stats.rows_flushed,
                        bytes = stats.bytes_sent,
                        duration_ms = stats.duration.as_millis(),
                        "Batch flush successful"
                    );

                    buffer.smart_clear(
                        self.config.buffer_shrink_threshold,
                        self.config.initial_buffer_size
                    );
                    *row_count = 0;
                    *batch_start = None;

                    return Ok(stats);
                }
                Err(e) => {
                    let err = BatchError::FlushError(e.to_string());
                    last_error = Some(err);
                    error!(error = ?last_error, attempt, "Flush attempt failed");
                    if attempt < self.config.max_retries {
                        let delay = self.config.retry_base_delay * 2u32.pow(attempt);
                        tokio::time::sleep(delay).await;
                    }
                }
            }
        }

        let error_msg = format!("Insert failed after {} retries. Last error: {:?}", self.config.max_retries, last_error);
        
        error!(
            error = %error_msg,
            dropped_rows = rows_to_flush,
            "CRITICAL: Dropping batch due to persistent error"
        );

        // CRITICAL: Clear buffer to prevent "poisoned buffer" scenario where bad data
        // blocks all future inserts forever.
        buffer.smart_clear(
            self.config.buffer_shrink_threshold,
            self.config.initial_buffer_size
        );
        *row_count = 0;
        *batch_start = None;

        Err(BatchError::FlushError(error_msg))
    }
}