use backoff::{future::retry, ExponentialBackoff};
use clickhouse::{Client, Row};
use serde::Serialize;
#[allow(unused_imports)]
use crate::prelude::*;
use crate::{config::*, types::*};
#[derive(Clone)]
pub struct Writer {
cfg: ClickhouseWriterConfig,
}
pub struct WriterState<A: Clone + Send> {
notify: Vec<A>,
}
#[derive(Debug, Clone)]
pub struct Notification<A: Clone + Send> {
pub table_name: String,
pub label: String,
pub since_last: Duration,
pub duration: Duration,
pub items: Vec<A>,
}
#[derive(Debug, Clone)]
pub struct GenericNotification {
pub table_name: String,
pub label: String,
pub since_last: Duration,
pub duration: Duration,
pub items: usize,
}
impl<A: Clone + Send> From<Notification<A>> for GenericNotification {
fn from(s: Notification<A>) -> Self {
GenericNotification {
table_name: s.table_name,
label: s.label,
since_last: s.since_last,
duration: s.duration,
items: s.items.len(),
}
}
}
impl Writer {
pub fn new(cfg: ClickhouseWriterConfig) -> Self {
Self { cfg }
}
pub async fn go_with_retry<A: Clone + std::fmt::Debug + Send, R: Row + Serialize, F: Fn(A) -> R>(
&self,
client: Client,
rx: Receiver<Vec<A>>,
notify_tx: Option<Sender<Notification<A>>>,
map: F,
) -> Result<()> {
let state = Arc::new(Mutex::new(WriterState { notify: Vec::new() }));
retry(ExponentialBackoff::default(), || async {
self.go(client.clone(), rx.clone(), notify_tx.clone(), &map, state.clone())
.await
.map_err(backoff::Error::Transient)?;
Ok(())
})
.await
}
pub async fn go<A: Clone + std::fmt::Debug + Send, R: Row + Serialize, F: Fn(A) -> R>(
&self,
client: Client,
rx: Receiver<Vec<A>>,
notify_tx: Option<Sender<Notification<A>>>,
map: &F,
state: Arc<Mutex<WriterState<A>>>,
) -> Result<()> {
let mut inserter = client
.inserter(self.cfg.table_name.as_str())?
.with_max_entries(self.cfg.buffer_capacity as u64)
.with_max_duration(*self.cfg.force_write_duration);
let mut last_write = Instant::now();
let notify = { state.lock().unwrap().notify.clone() };
for el in notify.into_iter() {
inserter.write(&map(el)).await?;
}
let mut done = false;
while !done {
if let Ok(r) = timeout(*self.cfg.check_for_force_write_duration, rx.recv_async()).await {
for el in r.unwrap_or_else(|_| {
done = true;
vec![]
}) {
state.lock().unwrap().notify.push(el.clone());
inserter.write(&map(el)).await.context("error during inserter.write")?;
}
}
let t = Instant::now();
let s = inserter.commit().await.context("error during inserter.commit")?;
let since_last = last_write.elapsed();
let write_took = t.elapsed();
if s.entries > 0 {
info!(
"Clickhouse write to {}/{}({} rows, {} transactions, {}ms since last) finished for {}ms.",
&self.cfg.table_name,
&self.cfg.label,
s.entries,
s.transactions,
since_last.as_millis(),
write_took.as_millis()
);
if let Some(notify_tx) = ¬ify_tx {
let n = Notification {
label: self.cfg.label.clone(),
table_name: self.cfg.table_name.clone(),
since_last,
duration: write_took,
items: { state.lock().unwrap().notify.clone() },
};
let _ = notify_tx.send_async(n).await;
}
state.lock().unwrap().notify.clear();
last_write = Instant::now();
}
}
inserter.end().await?;
Ok(())
}
}