use std::marker::PhantomData;
use std::time::{Duration, Instant};
use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle;
use tracing::{debug, error, info, instrument, warn};
use clicktype_core::traits::{ClickInsertable, ClickTable};
use clicktype_transport::{Client, ClickTypeTransport};
use crate::config::{BatchConfig, FlushStats};
use crate::buffer::BatchBuffer;
use crate::error::{BatchError, Result};
enum BatchMessage<T> {
Insert(T),
InsertMany(Vec<T>),
Flush(oneshot::Sender<Result<FlushStats>>),
Close(oneshot::Sender<Result<()>>),
}
#[derive(Clone)]
pub struct BatcherHandle<T> {
sender: mpsc::Sender<BatchMessage<T>>,
}
impl<T: ClickInsertable + Send + 'static> BatcherHandle<T> {
pub async fn insert(&self, row: T) -> Result<()> {
self.sender
.send(BatchMessage::Insert(row))
.await
.map_err(|_| BatchError::ChannelClosed)
}
pub fn try_insert(&self, row: T) -> Result<()> {
self.sender
.try_send(BatchMessage::Insert(row))
.map_err(|e| match e {
mpsc::error::TrySendError::Full(_) => BatchError::ChannelFull,
mpsc::error::TrySendError::Closed(_) => BatchError::ChannelClosed,
})
}
pub async fn insert_many(&self, rows: Vec<T>) -> Result<()> {
if rows.is_empty() {
return Ok(());
}
self.sender
.send(BatchMessage::InsertMany(rows))
.await
.map_err(|_| BatchError::ChannelClosed)
}
pub async fn flush(&self) -> Result<FlushStats> {
let (tx, rx) = oneshot::channel();
self.sender
.send(BatchMessage::Flush(tx))
.await
.map_err(|_| BatchError::ChannelClosed)?;
rx.await.map_err(|_| BatchError::ChannelClosed)?
}
pub async fn close(self) -> Result<()> {
let (tx, rx) = oneshot::channel();
self.sender
.send(BatchMessage::Close(tx))
.await
.map_err(|_| BatchError::ChannelClosed)?;
rx.await.map_err(|_| BatchError::ChannelClosed)?
}
}
pub struct GenericBatcher<T: ClickInsertable, C: ClickTypeTransport> {
config: BatchConfig,
client: C,
_phantom: PhantomData<T>,
}
pub type Batcher<T> = GenericBatcher<T, Client>;
impl<T, C> GenericBatcher<T, C>
where
T: ClickInsertable + ClickTable + Send + Sync + 'static,
C: ClickTypeTransport + Send + Sync + 'static,
{
pub fn new(client: C, config: BatchConfig) -> Self {
Self {
config,
client,
_phantom: PhantomData,
}
}
pub fn spawn(self) -> (BatcherHandle<T>, JoinHandle<()>) {
let (tx, rx) = mpsc::channel(self.config.channel_capacity);
let handle = BatcherHandle { sender: tx };
let join_handle = tokio::spawn(self.worker_loop(rx));
(handle, join_handle)
}
#[instrument(skip(self, rx), fields(table = T::table_name()), name = "batcher_worker")]
async fn worker_loop(self, mut rx: mpsc::Receiver<BatchMessage<T>>) {
debug!("Batcher worker started");
let _result = async {
let mut buffer = BatchBuffer::new(self.config.initial_buffer_size);
let mut row_count = 0usize;
let mut batch_start: Option<Instant> = None;
let mut flush_interval = tokio::time::interval(self.config.max_wait);
let mut schema_validated = false;
loop {
tokio::select! {
msg = rx.recv() => {
match msg {
Some(BatchMessage::Insert(row)) => {
if !schema_validated {
let schema = T::schema();
match self.client.validate_schema(T::table_name(), &schema).await {
Ok(()) => {
info!("Schema validation passed for table {}", T::table_name());
schema_validated = true;
}
Err(e) => {
error!(error = %e, "CRITICAL: Schema validation failed, dropping insert");
continue;
}
}
}
if batch_start.is_none() {
batch_start = Some(Instant::now());
}
if let Err(e) = row.write_row(&mut buffer) {
error!(error = %e, "Failed to serialize row");
continue;
}
row_count += 1;
if row_count >= self.config.max_rows
|| buffer.len() >= self.config.max_buffer_size
{
let _ = self.flush_buffer(&mut buffer, &mut row_count, &mut batch_start).await;
}
}
Some(BatchMessage::InsertMany(rows)) => {
if !schema_validated {
let schema = T::schema();
match self.client.validate_schema(T::table_name(), &schema).await {
Ok(()) => {
info!("Schema validation passed for table {}", T::table_name());
schema_validated = true;
}
Err(e) => {
error!(error = %e, "CRITICAL: Schema validation failed, dropping batch");
continue;
}
}
}
if batch_start.is_none() {
batch_start = Some(Instant::now());
}
for row in rows {
if let Err(e) = row.write_row(&mut buffer) {
error!(error = %e, "Failed to serialize row in batch");
continue;
}
row_count += 1;
}
if row_count >= self.config.max_rows {
let _ = self.flush_buffer(&mut buffer, &mut row_count, &mut batch_start).await;
}
}
Some(BatchMessage::Flush(respond)) => {
let result = self.flush_buffer(&mut buffer, &mut row_count, &mut batch_start).await;
let _ = respond.send(result);
}
Some(BatchMessage::Close(respond)) => {
let result = self.flush_buffer(&mut buffer, &mut row_count, &mut batch_start).await;
let _ = respond.send(result.map(|_| ()));
return; }
None => {
let _ = self.flush_buffer(&mut buffer, &mut row_count, &mut batch_start).await;
return; }
}
}
_ = flush_interval.tick() => {
if row_count > 0 {
let _ = self.flush_buffer(&mut buffer, &mut row_count, &mut batch_start).await;
}
}
}
}
}.await;
info!("Batcher worker stopped");
}
#[instrument(skip(self, buffer, row_count, batch_start), fields(rows = *row_count, bytes = buffer.len()))]
async fn flush_buffer(
&self,
buffer: &mut BatchBuffer,
row_count: &mut usize,
batch_start: &mut Option<Instant>,
) -> Result<FlushStats> {
if *row_count == 0 {
return Ok(FlushStats {
rows_flushed: 0,
bytes_sent: 0,
duration: Duration::ZERO,
batch_age: Duration::ZERO,
});
}
let flush_start = Instant::now();
let batch_age = batch_start.map(|s| s.elapsed()).unwrap_or(Duration::ZERO);
let bytes_to_send = buffer.len();
let rows_to_flush = *row_count;
let table_name = T::table_name();
debug!(rows = rows_to_flush, bytes = bytes_to_send, table = %table_name, "Starting flush to ClickHouse");
let mut last_error = None;
for attempt in 0..=self.config.max_retries {
if attempt > 0 {
warn!(attempt, max_retries = self.config.max_retries, "Retrying flush...");
}
match self.client.insert_binary(table_name, buffer.as_slice()).await {
Ok(()) => {
let duration = flush_start.elapsed();
let stats = FlushStats {
rows_flushed: rows_to_flush,
bytes_sent: bytes_to_send,
duration,
batch_age,
};
info!(
rows = stats.rows_flushed,
bytes = stats.bytes_sent,
duration_ms = stats.duration.as_millis(),
"Batch flush successful"
);
buffer.smart_clear(
self.config.buffer_shrink_threshold,
self.config.initial_buffer_size
);
*row_count = 0;
*batch_start = None;
return Ok(stats);
}
Err(e) => {
let err = BatchError::FlushError(e.to_string());
last_error = Some(err);
error!(error = ?last_error, attempt, "Flush attempt failed");
if attempt < self.config.max_retries {
let delay = self.config.retry_base_delay * 2u32.pow(attempt);
tokio::time::sleep(delay).await;
}
}
}
}
let error_msg = format!("Insert failed after {} retries. Last error: {:?}", self.config.max_retries, last_error);
error!(
error = %error_msg,
dropped_rows = rows_to_flush,
"CRITICAL: Dropping batch due to persistent error"
);
buffer.smart_clear(
self.config.buffer_shrink_threshold,
self.config.initial_buffer_size
);
*row_count = 0;
*batch_start = None;
Err(BatchError::FlushError(error_msg))
}
}