Skip to main content

clicktype_batch/
batcher.rs

1//! Batcher implementation for high-throughput ingestion.
2//!
3//! The batcher accumulates rows in memory and flushes them to ClickHouse when:
4//! 1. The number of rows reaches `max_rows`
5//! 2. The buffer size reaches `max_buffer_size`
6//! 3. The time since the first row insertion exceeds `max_wait`
7//!
8//! # Architecture
9//!
10//! The batcher runs as a background worker task spawned via `tokio::spawn`.
11//! It communicates with the application via a multi-producer single-consumer (MPSC) channel.
12//!
13//! # Memory Management
14//!
15//! The batcher uses a single reusable buffer to minimize allocations.
16//! After each flush, if the buffer capacity exceeds `buffer_shrink_threshold`,
17//! it is reallocated to `initial_buffer_size` to release memory back to the OS.
18
19use std::marker::PhantomData;
20use std::time::{Duration, Instant};
21use tokio::sync::{mpsc, oneshot};
22use tokio::task::JoinHandle;
23use tracing::{debug, error, info, instrument, warn};
24use clicktype_core::traits::{ClickInsertable, ClickTable};
25use clicktype_transport::{Client, ClickTypeTransport};
26use crate::config::{BatchConfig, FlushStats};
27use crate::buffer::BatchBuffer;
28use crate::error::{BatchError, Result};
29
30/// Message types for the batcher worker
31enum BatchMessage<T> {
32    Insert(T),
33    InsertMany(Vec<T>),
34    Flush(oneshot::Sender<Result<FlushStats>>),
35    Close(oneshot::Sender<Result<()>>),
36}
37
38/// A handle to the background batcher worker.
39///
40/// This handle is cheap to clone and can be shared across threads.
41/// It provides methods to insert rows and control the batcher lifecycle.
42#[derive(Clone)]
43pub struct BatcherHandle<T> {
44    sender: mpsc::Sender<BatchMessage<T>>,
45}
46
47impl<T: ClickInsertable + Send + 'static> BatcherHandle<T> {
48    /// Insert a single row into the batch.
49    ///
50    /// This method waits if the internal channel is full (Backpressure).
51    /// Use this for data integrity when you want to slow down the producer
52    /// rather than dropping data.
53    ///
54    /// # Errors
55    /// Returns an error if the batcher worker has terminated.
56    pub async fn insert(&self, row: T) -> Result<()> {
57        self.sender
58            .send(BatchMessage::Insert(row))
59            .await
60            .map_err(|_| BatchError::ChannelClosed)
61    }
62
63    /// Try to insert a single row without blocking.
64    ///
65    /// This method fails immediately if the channel is full (Load Shedding).
66    /// Use this for high-availability scenarios where dropping data is preferable
67    /// to blocking the application.
68    ///
69    /// # Errors
70    /// Returns `Channel full` if the channel is at capacity.
71    /// Returns `Channel closed` if the worker has terminated.
72    pub fn try_insert(&self, row: T) -> Result<()> {
73        self.sender
74            .try_send(BatchMessage::Insert(row))
75            .map_err(|e| match e {
76                mpsc::error::TrySendError::Full(_) => BatchError::ChannelFull,
77                mpsc::error::TrySendError::Closed(_) => BatchError::ChannelClosed,
78            })
79    }
80
81    /// Insert multiple rows into the batch.
82    ///
83    /// This treats the vector as a single unit for channel capacity purposes.
84    pub async fn insert_many(&self, rows: Vec<T>) -> Result<()> {
85        if rows.is_empty() {
86            return Ok(());
87        }
88        self.sender
89            .send(BatchMessage::InsertMany(rows))
90            .await
91            .map_err(|_| BatchError::ChannelClosed)
92    }
93
94    /// Force a flush of the current buffer and wait for completion.
95    ///
96    /// This ensures all data currently in the buffer is sent to ClickHouse.
97    pub async fn flush(&self) -> Result<FlushStats> {
98        let (tx, rx) = oneshot::channel();
99        self.sender
100            .send(BatchMessage::Flush(tx))
101            .await
102            .map_err(|_| BatchError::ChannelClosed)?;
103        rx.await.map_err(|_| BatchError::ChannelClosed)?
104    }
105
106    /// Close the batcher and flush remaining data.
107    ///
108    /// This signals the worker to finish processing pending items, flush the buffer,
109    /// and then terminate.
110    pub async fn close(self) -> Result<()> {
111        let (tx, rx) = oneshot::channel();
112        self.sender
113            .send(BatchMessage::Close(tx))
114            .await
115            .map_err(|_| BatchError::ChannelClosed)?;
116        rx.await.map_err(|_| BatchError::ChannelClosed)?
117    }
118}
119
120/// The main entry point for batch ingestion.
121///
122/// `GenericBatcher` is responsible for configuring and spawning the background worker.
123/// It is generic over the transport implementation `C`.
124pub struct GenericBatcher<T: ClickInsertable, C: ClickTypeTransport> {
125    config: BatchConfig,
126    client: C,
127    _phantom: PhantomData<T>,
128}
129
130/// Default Batcher alias using the standard Client
131pub type Batcher<T> = GenericBatcher<T, Client>;
132
133impl<T, C> GenericBatcher<T, C>
134where
135    T: ClickInsertable + ClickTable + Send + Sync + 'static,
136    C: ClickTypeTransport + Send + Sync + 'static,
137{
138    /// Create a new batcher instance.
139    ///
140    /// # Arguments
141    /// * `client` - Configured ClickType client (or mock)
142    /// * `config` - Batching configuration
143    pub fn new(client: C, config: BatchConfig) -> Self {
144        Self {
145            config,
146            client,
147            _phantom: PhantomData,
148        }
149    }
150
151    /// Spawn the batcher worker task.
152    ///
153    /// Returns a tuple containing:
154    /// 1. `BatcherHandle`: Used to insert data.
155    /// 2. `JoinHandle`: Used to supervise the worker task (detect panics or wait for completion).
156    ///
157    /// # Example
158    /// ```rust,ignore
159    /// let (handle, worker) = batcher.spawn();
160    /// handle.insert(data).await?;
161    /// handle.close().await?;
162    /// worker.await?; // Ensure clean exit
163    /// ```
164    pub fn spawn(self) -> (BatcherHandle<T>, JoinHandle<()>) {
165        let (tx, rx) = mpsc::channel(self.config.channel_capacity);
166        let handle = BatcherHandle { sender: tx };
167
168        let join_handle = tokio::spawn(self.worker_loop(rx));
169
170        (handle, join_handle)
171    }
172
173    /// Worker loop that processes inserts and flushes
174    #[instrument(skip(self, rx), fields(table = T::table_name()), name = "batcher_worker")]
175    async fn worker_loop(self, mut rx: mpsc::Receiver<BatchMessage<T>>) {
176        debug!("Batcher worker started");
177
178        // Wrap logic to ensure we log exit
179        let _result = async {
180            let mut buffer = BatchBuffer::new(self.config.initial_buffer_size);
181            let mut row_count = 0usize;
182            let mut batch_start: Option<Instant> = None;
183            let mut flush_interval = tokio::time::interval(self.config.max_wait);
184            let mut schema_validated = false;
185
186            loop {
187                tokio::select! {
188                    msg = rx.recv() => {
189                        match msg {
190                            Some(BatchMessage::Insert(row)) => {
191                                // Validate schema on first insert
192                                if !schema_validated {
193                                    // Use the schema from T (ClickTable)
194                                    let schema = T::schema();
195                                    match self.client.validate_schema(T::table_name(), &schema).await {
196                                        Ok(()) => {
197                                            info!("Schema validation passed for table {}", T::table_name());
198                                            schema_validated = true;
199                                        }
200                                        Err(e) => {
201                                            error!(error = %e, "CRITICAL: Schema validation failed, dropping insert");
202                                            continue;
203                                        }
204                                    }
205                                }
206
207                                if batch_start.is_none() {
208                                    batch_start = Some(Instant::now());
209                                }
210
211                                if let Err(e) = row.write_row(&mut buffer) {
212                                    error!(error = %e, "Failed to serialize row");
213                                    continue;
214                                }
215
216                                row_count += 1;
217
218                                if row_count >= self.config.max_rows
219                                    || buffer.len() >= self.config.max_buffer_size
220                                {
221                                    let _ = self.flush_buffer(&mut buffer, &mut row_count, &mut batch_start).await;
222                                }
223                            }
224
225                            Some(BatchMessage::InsertMany(rows)) => {
226                                // Validate schema on first insert
227                                if !schema_validated {
228                                    let schema = T::schema();
229                                    match self.client.validate_schema(T::table_name(), &schema).await {
230                                        Ok(()) => {
231                                            info!("Schema validation passed for table {}", T::table_name());
232                                            schema_validated = true;
233                                        }
234                                        Err(e) => {
235                                            error!(error = %e, "CRITICAL: Schema validation failed, dropping batch");
236                                            continue;
237                                        }
238                                    }
239                                }
240
241                                if batch_start.is_none() {
242                                    batch_start = Some(Instant::now());
243                                }
244
245                                for row in rows {
246                                    if let Err(e) = row.write_row(&mut buffer) {
247                                        error!(error = %e, "Failed to serialize row in batch");
248                                        continue;
249                                    }
250                                    row_count += 1;
251                                }
252
253                                if row_count >= self.config.max_rows {
254                                    let _ = self.flush_buffer(&mut buffer, &mut row_count, &mut batch_start).await;
255                                }
256                            }
257
258                            Some(BatchMessage::Flush(respond)) => {
259                                let result = self.flush_buffer(&mut buffer, &mut row_count, &mut batch_start).await;
260                                let _ = respond.send(result);
261                            }
262
263                            Some(BatchMessage::Close(respond)) => {
264                                let result = self.flush_buffer(&mut buffer, &mut row_count, &mut batch_start).await;
265                                let _ = respond.send(result.map(|_| ()));
266                                return; // Exit loop
267                            }
268
269                            None => {
270                                // Channel closed
271                                let _ = self.flush_buffer(&mut buffer, &mut row_count, &mut batch_start).await;
272                                return; // Exit loop
273                            }
274                        }
275                    }
276
277                    _ = flush_interval.tick() => {
278                        if row_count > 0 {
279                            let _ = self.flush_buffer(&mut buffer, &mut row_count, &mut batch_start).await;
280                        }
281                    }
282                }
283            }
284        }.await;
285
286        info!("Batcher worker stopped");
287    }
288
289    /// Flush the buffer to ClickHouse
290    #[instrument(skip(self, buffer, row_count, batch_start), fields(rows = *row_count, bytes = buffer.len()))]
291    async fn flush_buffer(
292        &self,
293        buffer: &mut BatchBuffer,
294        row_count: &mut usize,
295        batch_start: &mut Option<Instant>,
296    ) -> Result<FlushStats> {
297        if *row_count == 0 {
298            return Ok(FlushStats {
299                rows_flushed: 0,
300                bytes_sent: 0,
301                duration: Duration::ZERO,
302                batch_age: Duration::ZERO,
303            });
304        }
305
306        let flush_start = Instant::now();
307        let batch_age = batch_start.map(|s| s.elapsed()).unwrap_or(Duration::ZERO);
308        let bytes_to_send = buffer.len();
309        let rows_to_flush = *row_count;
310        let table_name = T::table_name();
311
312        debug!(rows = rows_to_flush, bytes = bytes_to_send, table = %table_name, "Starting flush to ClickHouse");
313
314        // Try insert with retries
315        let mut last_error = None;
316        for attempt in 0..=self.config.max_retries {
317            if attempt > 0 {
318                warn!(attempt, max_retries = self.config.max_retries, "Retrying flush...");
319            }
320
321            match self.client.insert_binary(table_name, buffer.as_slice()).await {
322                Ok(()) => {
323                    let duration = flush_start.elapsed();
324                    let stats = FlushStats {
325                        rows_flushed: rows_to_flush,
326                        bytes_sent: bytes_to_send,
327                        duration,
328                        batch_age,
329                    };
330
331                    info!(
332                        rows = stats.rows_flushed,
333                        bytes = stats.bytes_sent,
334                        duration_ms = stats.duration.as_millis(),
335                        "Batch flush successful"
336                    );
337
338                    buffer.smart_clear(
339                        self.config.buffer_shrink_threshold,
340                        self.config.initial_buffer_size
341                    );
342                    *row_count = 0;
343                    *batch_start = None;
344
345                    return Ok(stats);
346                }
347                Err(e) => {
348                    let err = BatchError::FlushError(e.to_string());
349                    last_error = Some(err);
350                    error!(error = ?last_error, attempt, "Flush attempt failed");
351                    if attempt < self.config.max_retries {
352                        let delay = self.config.retry_base_delay * 2u32.pow(attempt);
353                        tokio::time::sleep(delay).await;
354                    }
355                }
356            }
357        }
358
359        let error_msg = format!("Insert failed after {} retries. Last error: {:?}", self.config.max_retries, last_error);
360        
361        error!(
362            error = %error_msg,
363            dropped_rows = rows_to_flush,
364            "CRITICAL: Dropping batch due to persistent error"
365        );
366
367        // CRITICAL: Clear buffer to prevent "poisoned buffer" scenario where bad data
368        // blocks all future inserts forever.
369        buffer.smart_clear(
370            self.config.buffer_shrink_threshold,
371            self.config.initial_buffer_size
372        );
373        *row_count = 0;
374        *batch_start = None;
375
376        Err(BatchError::FlushError(error_msg))
377    }
378}