1use std::marker::PhantomData;
20use std::time::{Duration, Instant};
21use tokio::sync::{mpsc, oneshot};
22use tokio::task::JoinHandle;
23use tracing::{debug, error, info, instrument, warn};
24use clicktype_core::traits::{ClickInsertable, ClickTable};
25use clicktype_transport::{Client, ClickTypeTransport};
26use crate::config::{BatchConfig, FlushStats};
27use crate::buffer::BatchBuffer;
28use crate::error::{BatchError, Result};
29
30enum BatchMessage<T> {
32 Insert(T),
33 InsertMany(Vec<T>),
34 Flush(oneshot::Sender<Result<FlushStats>>),
35 Close(oneshot::Sender<Result<()>>),
36}
37
38#[derive(Clone)]
43pub struct BatcherHandle<T> {
44 sender: mpsc::Sender<BatchMessage<T>>,
45}
46
47impl<T: ClickInsertable + Send + 'static> BatcherHandle<T> {
48 pub async fn insert(&self, row: T) -> Result<()> {
57 self.sender
58 .send(BatchMessage::Insert(row))
59 .await
60 .map_err(|_| BatchError::ChannelClosed)
61 }
62
63 pub fn try_insert(&self, row: T) -> Result<()> {
73 self.sender
74 .try_send(BatchMessage::Insert(row))
75 .map_err(|e| match e {
76 mpsc::error::TrySendError::Full(_) => BatchError::ChannelFull,
77 mpsc::error::TrySendError::Closed(_) => BatchError::ChannelClosed,
78 })
79 }
80
81 pub async fn insert_many(&self, rows: Vec<T>) -> Result<()> {
85 if rows.is_empty() {
86 return Ok(());
87 }
88 self.sender
89 .send(BatchMessage::InsertMany(rows))
90 .await
91 .map_err(|_| BatchError::ChannelClosed)
92 }
93
94 pub async fn flush(&self) -> Result<FlushStats> {
98 let (tx, rx) = oneshot::channel();
99 self.sender
100 .send(BatchMessage::Flush(tx))
101 .await
102 .map_err(|_| BatchError::ChannelClosed)?;
103 rx.await.map_err(|_| BatchError::ChannelClosed)?
104 }
105
106 pub async fn close(self) -> Result<()> {
111 let (tx, rx) = oneshot::channel();
112 self.sender
113 .send(BatchMessage::Close(tx))
114 .await
115 .map_err(|_| BatchError::ChannelClosed)?;
116 rx.await.map_err(|_| BatchError::ChannelClosed)?
117 }
118}
119
120pub struct GenericBatcher<T: ClickInsertable, C: ClickTypeTransport> {
125 config: BatchConfig,
126 client: C,
127 _phantom: PhantomData<T>,
128}
129
130pub type Batcher<T> = GenericBatcher<T, Client>;
132
133impl<T, C> GenericBatcher<T, C>
134where
135 T: ClickInsertable + ClickTable + Send + Sync + 'static,
136 C: ClickTypeTransport + Send + Sync + 'static,
137{
138 pub fn new(client: C, config: BatchConfig) -> Self {
144 Self {
145 config,
146 client,
147 _phantom: PhantomData,
148 }
149 }
150
151 pub fn spawn(self) -> (BatcherHandle<T>, JoinHandle<()>) {
165 let (tx, rx) = mpsc::channel(self.config.channel_capacity);
166 let handle = BatcherHandle { sender: tx };
167
168 let join_handle = tokio::spawn(self.worker_loop(rx));
169
170 (handle, join_handle)
171 }
172
173 #[instrument(skip(self, rx), fields(table = T::table_name()), name = "batcher_worker")]
175 async fn worker_loop(self, mut rx: mpsc::Receiver<BatchMessage<T>>) {
176 debug!("Batcher worker started");
177
178 let _result = async {
180 let mut buffer = BatchBuffer::new(self.config.initial_buffer_size);
181 let mut row_count = 0usize;
182 let mut batch_start: Option<Instant> = None;
183 let mut flush_interval = tokio::time::interval(self.config.max_wait);
184 let mut schema_validated = false;
185
186 loop {
187 tokio::select! {
188 msg = rx.recv() => {
189 match msg {
190 Some(BatchMessage::Insert(row)) => {
191 if !schema_validated {
193 let schema = T::schema();
195 match self.client.validate_schema(T::table_name(), &schema).await {
196 Ok(()) => {
197 info!("Schema validation passed for table {}", T::table_name());
198 schema_validated = true;
199 }
200 Err(e) => {
201 error!(error = %e, "CRITICAL: Schema validation failed, dropping insert");
202 continue;
203 }
204 }
205 }
206
207 if batch_start.is_none() {
208 batch_start = Some(Instant::now());
209 }
210
211 if let Err(e) = row.write_row(&mut buffer) {
212 error!(error = %e, "Failed to serialize row");
213 continue;
214 }
215
216 row_count += 1;
217
218 if row_count >= self.config.max_rows
219 || buffer.len() >= self.config.max_buffer_size
220 {
221 let _ = self.flush_buffer(&mut buffer, &mut row_count, &mut batch_start).await;
222 }
223 }
224
225 Some(BatchMessage::InsertMany(rows)) => {
226 if !schema_validated {
228 let schema = T::schema();
229 match self.client.validate_schema(T::table_name(), &schema).await {
230 Ok(()) => {
231 info!("Schema validation passed for table {}", T::table_name());
232 schema_validated = true;
233 }
234 Err(e) => {
235 error!(error = %e, "CRITICAL: Schema validation failed, dropping batch");
236 continue;
237 }
238 }
239 }
240
241 if batch_start.is_none() {
242 batch_start = Some(Instant::now());
243 }
244
245 for row in rows {
246 if let Err(e) = row.write_row(&mut buffer) {
247 error!(error = %e, "Failed to serialize row in batch");
248 continue;
249 }
250 row_count += 1;
251 }
252
253 if row_count >= self.config.max_rows {
254 let _ = self.flush_buffer(&mut buffer, &mut row_count, &mut batch_start).await;
255 }
256 }
257
258 Some(BatchMessage::Flush(respond)) => {
259 let result = self.flush_buffer(&mut buffer, &mut row_count, &mut batch_start).await;
260 let _ = respond.send(result);
261 }
262
263 Some(BatchMessage::Close(respond)) => {
264 let result = self.flush_buffer(&mut buffer, &mut row_count, &mut batch_start).await;
265 let _ = respond.send(result.map(|_| ()));
266 return; }
268
269 None => {
270 let _ = self.flush_buffer(&mut buffer, &mut row_count, &mut batch_start).await;
272 return; }
274 }
275 }
276
277 _ = flush_interval.tick() => {
278 if row_count > 0 {
279 let _ = self.flush_buffer(&mut buffer, &mut row_count, &mut batch_start).await;
280 }
281 }
282 }
283 }
284 }.await;
285
286 info!("Batcher worker stopped");
287 }
288
289 #[instrument(skip(self, buffer, row_count, batch_start), fields(rows = *row_count, bytes = buffer.len()))]
291 async fn flush_buffer(
292 &self,
293 buffer: &mut BatchBuffer,
294 row_count: &mut usize,
295 batch_start: &mut Option<Instant>,
296 ) -> Result<FlushStats> {
297 if *row_count == 0 {
298 return Ok(FlushStats {
299 rows_flushed: 0,
300 bytes_sent: 0,
301 duration: Duration::ZERO,
302 batch_age: Duration::ZERO,
303 });
304 }
305
306 let flush_start = Instant::now();
307 let batch_age = batch_start.map(|s| s.elapsed()).unwrap_or(Duration::ZERO);
308 let bytes_to_send = buffer.len();
309 let rows_to_flush = *row_count;
310 let table_name = T::table_name();
311
312 debug!(rows = rows_to_flush, bytes = bytes_to_send, table = %table_name, "Starting flush to ClickHouse");
313
314 let mut last_error = None;
316 for attempt in 0..=self.config.max_retries {
317 if attempt > 0 {
318 warn!(attempt, max_retries = self.config.max_retries, "Retrying flush...");
319 }
320
321 match self.client.insert_binary(table_name, buffer.as_slice()).await {
322 Ok(()) => {
323 let duration = flush_start.elapsed();
324 let stats = FlushStats {
325 rows_flushed: rows_to_flush,
326 bytes_sent: bytes_to_send,
327 duration,
328 batch_age,
329 };
330
331 info!(
332 rows = stats.rows_flushed,
333 bytes = stats.bytes_sent,
334 duration_ms = stats.duration.as_millis(),
335 "Batch flush successful"
336 );
337
338 buffer.smart_clear(
339 self.config.buffer_shrink_threshold,
340 self.config.initial_buffer_size
341 );
342 *row_count = 0;
343 *batch_start = None;
344
345 return Ok(stats);
346 }
347 Err(e) => {
348 let err = BatchError::FlushError(e.to_string());
349 last_error = Some(err);
350 error!(error = ?last_error, attempt, "Flush attempt failed");
351 if attempt < self.config.max_retries {
352 let delay = self.config.retry_base_delay * 2u32.pow(attempt);
353 tokio::time::sleep(delay).await;
354 }
355 }
356 }
357 }
358
359 let error_msg = format!("Insert failed after {} retries. Last error: {:?}", self.config.max_retries, last_error);
360
361 error!(
362 error = %error_msg,
363 dropped_rows = rows_to_flush,
364 "CRITICAL: Dropping batch due to persistent error"
365 );
366
367 buffer.smart_clear(
370 self.config.buffer_shrink_threshold,
371 self.config.initial_buffer_size
372 );
373 *row_count = 0;
374 *batch_start = None;
375
376 Err(BatchError::FlushError(error_msg))
377 }
378}