Skip to main content

clickhouse_connection_pool/
batch_processor.rs

1use tokio::sync::mpsc;
2
3use crate::pool::ClickhouseError;
4
5pub enum BatchCommand<T> {
6    Add(T),
7    Flush,
8}
9
10pub struct BatchSender<T> {
11    pub tx: mpsc::Sender<BatchCommand<T>>,
12}
13
14impl<T> BatchSender<T> {
15    pub async fn add(&self, item: T) -> Result<(), ClickhouseError> {
16        self.tx
17            .send(BatchCommand::Add(item))
18            .await
19            .map_err(|_| ClickhouseError::BatchInsertionError("Channel closed".to_string()))
20    }
21
22    pub async fn flush(&self) -> Result<(), ClickhouseError> {
23        self.tx
24            .send(BatchCommand::Flush)
25            .await
26            .map_err(|_| ClickhouseError::BatchInsertionError("Channel closed".to_string()))
27    }
28}
29
30impl<T> Clone for BatchSender<T> {
31    fn clone(&self) -> Self {
32        Self {
33            tx: self.tx.clone(),
34        }
35    }
36}