use std::time::Duration;
use log::error;
use crate::{Client, KlickhouseError};
#[derive(Clone)]
pub struct ClickhouseLock {
name: String,
cluster_str: String,
client: Client,
}
pub struct ClickhouseLockHandle<'a> {
lock: Option<&'a ClickhouseLock>,
}
impl ClickhouseLock {
pub fn new(client: Client, name: impl AsRef<str>) -> Self {
Self {
name: name.as_ref().to_string(),
client,
cluster_str: String::new(),
}
}
pub fn with_cluster(mut self, cluster: impl AsRef<str>) -> Self {
self.cluster_str = format!(" ON CLUSTER {}", cluster.as_ref());
self
}
pub async fn try_lock(&self) -> Result<Option<ClickhouseLockHandle<'_>>, KlickhouseError> {
let query = format!(
"CREATE TABLE _lock_{}{} (i Int64)ENGINE=Null",
self.name, self.cluster_str
);
match self.client.execute(&query).await {
Ok(()) => (),
Err(e) => {
let error = e.to_string();
if error.contains("already exists") {
return Ok(None);
} else {
return Err(e);
}
}
}
Ok(Some(ClickhouseLockHandle { lock: Some(self) }))
}
pub async fn lock(&self) -> Result<ClickhouseLockHandle<'_>, KlickhouseError> {
let query = format!(
"CREATE TABLE _lock_{}{} (i Int64)ENGINE=Null",
self.name, self.cluster_str
);
loop {
match self.client.execute(&query).await {
Ok(()) => break,
Err(e) => {
let error = e.to_string();
if error.contains("already exists") {
tokio::time::sleep(Duration::from_millis(100)).await;
} else {
return Err(e);
}
}
}
}
Ok(ClickhouseLockHandle { lock: Some(self) })
}
pub async fn reset(&self) -> Result<(), KlickhouseError> {
self.client
.execute(format!(
"DROP TABLE IF EXISTS _lock_{}{} SYNC",
self.name, self.cluster_str
))
.await
}
}
impl ClickhouseLockHandle<'_> {
pub async fn unlock(mut self) -> Result<(), KlickhouseError> {
self.lock.take().unwrap().reset().await
}
}
impl Drop for ClickhouseLockHandle<'_> {
fn drop(&mut self) {
if let Some(lock) = self.lock.take().cloned() {
tokio::spawn(async move {
if let Err(e) = lock.reset().await {
error!("failed to reset lock: {}: {e:?}", lock.name);
}
});
}
}
}