use backoff::{future::retry, ExponentialBackoff};
use clickhouse::{Client, Row};
use serde::Serialize;
#[allow(unused_imports)]
use crate::prelude::*;
use crate::{config::*, types::*};
#[derive(Clone)]
pub struct Writer {
cfg: ClickhouseWriterConfig,
}
pub struct WriterState<A: Clone + Send> {
notify: Vec<A>,
}
#[derive(Debug, Clone)]
pub struct Notification<A: Clone + Send> {
pub table_name: String,
pub label: String,
pub since_last: Duration,
pub duration: Duration,
pub items: Vec<A>,
}
#[derive(Debug, Clone)]
pub struct GenericNotification {
pub table_name: String,
pub label: String,
pub since_last: Duration,
pub duration: Duration,
pub items: usize,
}
impl<A: Clone + Send> From<Notification<A>> for GenericNotification {
fn from(s: Notification<A>) -> Self {
GenericNotification {
table_name: s.table_name,
label: s.label,
since_last: s.since_last,
duration: s.duration,
items: s.items.len(),
}
}
}
impl Writer {
pub fn new(cfg: ClickhouseWriterConfig) -> Self {
Self { cfg }
}
pub async fn go_with_retry<
A: Clone + std::fmt::Debug + Send + 'static,
R: Row + Serialize + Send + Sync + 'static,
F: Fn(A) -> R + Send + Sync + 'static,
>(
&self,
client: Client,
rx: Receiver<Vec<A>>,
notify_tx: Option<Sender<Notification<A>>>,
map: F,
) -> Result<()> {
let state = Arc::new(Mutex::new(WriterState { notify: Vec::new() }));
let map = Arc::new(map);
retry(ExponentialBackoff::default(), || async {
Writer::go(
self.cfg.clone(),
client.clone(),
rx.clone(),
notify_tx.clone(),
Arc::clone(&map),
state.clone(),
)
.instrument()
.await
.map_err(backoff::Error::Transient)?;
Ok(())
})
.await
}
pub fn go<
A: Clone + std::fmt::Debug + Send + 'static,
R: Row + Serialize + Send + Sync + 'static,
F: Fn(A) -> R + Send + Sync + 'static,
>(
cfg: ClickhouseWriterConfig,
client: Client,
rx: Receiver<Vec<A>>,
notify_tx: Option<Sender<Notification<A>>>,
map: Arc<F>,
state: Arc<Mutex<WriterState<A>>>,
) -> TracingTask<'static> {
TracingTask::new(span!(), async move {
let mut inserter = client
.inserter(cfg.table_name.as_str())?
.with_max_entries(cfg.buffer_capacity as u64)
.with_max_duration(*cfg.force_write_duration);
let mut last_write = Instant::now();
let notify = { state.lock().unwrap().notify.clone() };
for el in notify.into_iter() {
let mapped = map(el);
inserter.write(&mapped).await?;
}
let mut done = false;
while !done {
if let Ok(r) = timeout(*cfg.check_for_force_write_duration, rx.recv_async()).await {
for el in r.unwrap_or_else(|_| {
done = true;
vec![]
}) {
state.lock().unwrap().notify.push(el.clone());
inserter.write(&map(el)).await.context("error during inserter.write")?;
}
}
let t = Instant::now();
let s = inserter.commit().await.context("error during inserter.commit")?;
let since_last = last_write.elapsed();
let write_took = t.elapsed();
if s.entries > 0 {
info!(
"Clickhouse write to {}/{}({} rows, {} transactions, {}ms since last) finished for {}ms.",
&cfg.table_name,
&cfg.label,
s.entries,
s.transactions,
since_last.as_millis(),
write_took.as_millis()
);
if let Some(notify_tx) = ¬ify_tx {
let n = Notification {
label: cfg.label.clone(),
table_name: cfg.table_name.clone(),
since_last,
duration: write_took,
items: { state.lock().unwrap().notify.clone() },
};
let _ = notify_tx.send_async(n).await;
}
state.lock().unwrap().notify.clear();
last_write = Instant::now();
}
}
inserter.end().await?;
Ok(())
})
}
}