mod playback_record;
use crate::config::ClickHouseConfig;
use anyhow::Result;
use aqueue::Actor;
use clickhouse::inserter::Inserter;
use clickhouse::{Client, Row};
pub use playback_record::PlayBackRecord;
use std::time::Duration;
pub struct RePlayClickHouseDB {
client: Client,
writer: Inserter<PlayBackRecord>,
}
impl RePlayClickHouseDB {
pub fn new(config: &ClickHouseConfig) -> Self {
let client = Client::default()
.with_user(&config.username)
.with_password(&config.password)
.with_database(&config.database)
.with_url(&config.url);
log::debug!("ClickHouseDB connected to {}", &config.url);
let writer = Self::make_inserter_static(&client, "playback_record");
Self { client, writer }
}
#[inline]
async fn write_replay_log(&mut self, data: &PlayBackRecord) -> Result<()> {
self.writer.write(data).await?;
self.writer.commit().await?;
Ok(())
}
#[inline]
async fn commit(&mut self) -> Result<()> {
self.writer.commit().await?;
Ok(())
}
#[inline]
fn make_inserter_static<T: Row>(client: &Client, table_name: &str) -> Inserter<T> {
client
.inserter(table_name)
.with_max_rows(1000)
.with_period(Some(Duration::from_secs(5)))
}
}
pub trait IRePlayClickHouseDB {
async fn write_replay_log(&self, data: &PlayBackRecord) -> Result<()>;
async fn commit(&self) -> Result<()>;
}
impl IRePlayClickHouseDB for Actor<RePlayClickHouseDB> {
#[inline]
async fn write_replay_log(&self, data: &PlayBackRecord) -> Result<()> {
self.inner_call(|inner| async move { inner.get_mut().write_replay_log(data).await })
.await
}
#[inline]
async fn commit(&self) -> Result<()> {
self.inner_call(|inner| async move { inner.get_mut().commit().await })
.await
}
}