#[allow(unused_imports)]
use crate::prelude::*;
use crate::{
types::*,
config::*
};
use backoff::future::retry;
use backoff::ExponentialBackoff;
use clickhouse::{Client, Reflection};
use serde::Serialize;
use scopeguard::guard;
#[derive(Clone)]
pub struct Writer {
cfg: ClickhouseWriterConfig
}
pub struct WriterState<A: Clone + Send> {
notify: Vec<A>,
}
#[derive(Debug, Clone)]
pub struct Notification<A: Clone + Send> {
pub table_name: String,
pub label: String,
pub since_last: Duration,
pub duration: Duration,
pub items: Vec<A>,
}
#[derive(Debug, Clone)]
pub struct GenericNotification {
pub table_name: String,
pub label: String,
pub since_last: Duration,
pub duration: Duration,
pub items: usize,
}
impl<A: Clone + Send> From<Notification<A>> for GenericNotification {
fn from(s: Notification<A>) -> Self {
GenericNotification{
table_name: s.table_name,
label: s.label,
since_last: s.since_last,
duration: s.duration,
items: s.items.len(),
}
}
}
impl Writer {
pub fn new(cfg: ClickhouseWriterConfig) -> Self {
Self {
cfg
}
}
pub async fn go_with_retry<A: Clone + std::fmt::Debug + Send, R: Reflection + Serialize, F: Fn(A) -> R>(
&self,
client: Client,
rx: Receiver<Vec<A>>,
notify_tx: Option<Sender<Notification<A>>>,
map: F,
) -> Result<()> {
let state = Arc::new(Mutex::new(WriterState { notify: Vec::new() }));
retry(ExponentialBackoff::default(), || {
let client = client.clone();
let rx = rx.clone();
let notify_tx = notify_tx.clone();
async {
self.go(client, rx, notify_tx, &map, state.clone())
.await
.map_err(backoff::Error::Transient)?;
Ok(())
}
})
.await
}
pub async fn go<A: Clone + std::fmt::Debug + Send, R: Reflection + Serialize, F: Fn(A) -> R>(
&self,
client: Client,
rx: Receiver<Vec<A>>,
notify_tx: Option<Sender<Notification<A>>>,
map: &F,
state: Arc<Mutex<WriterState<A>>>,
) -> Result<()> {
let mut inserter = client
.inserter(self.cfg.table_name.as_str())?
.with_max_entries(self.cfg.buffer_capacity as u64)
.with_max_duration(*self.cfg.force_write_duration);
let mut last_write = Instant::now();
let mut notify = guard( state.lock().unwrap().notify.clone(), |notify| {
let mut state = state.lock().unwrap();
state.notify = notify;
});
for el in notify.clone() {
inserter.write(&map(el)).await?;
}
let mut done = false;
while !done {
let timeout = time::sleep(*self.cfg.check_for_force_write_duration);
tokio::pin!(timeout);
tokio::select! {
els = rx.recv_async() => {
if els.is_err() {
done = true;
} else {
for el in els.unwrap() {
notify.push(el.clone());
inserter.write(&map(el)).await.context("error during inserter.write")?;
}
}
}
_ = &mut timeout => {
}
}
let t = Instant::now();
let s = inserter.commit().await.context("error during inserter.commit");
let since_last = last_write.elapsed();
let write_took = t.elapsed();
let s = s?;
if s.entries > 0 {
info!("Clickhouse write to {}/{}({} rows, {} transactions, {}ms since last) finished for {}ms.", &self.cfg.table_name, &self.cfg.label, s.entries, s.transactions, since_last.as_millis(), write_took.as_millis());
if notify_tx.is_some() {
let n = Notification {
label: self.cfg.label.clone(),
table_name: self.cfg.table_name.clone(),
since_last,
duration: write_took,
items: notify.clone(),
};
let _ = notify_tx.as_ref().unwrap().send_async(n).await;
}
notify.clear();
last_write = Instant::now();
}
}
inserter.end().await?;
Ok(())
}
}