use std::{future::Future, os::unix::fs::PermissionsExt, path::Path, pin::Pin, process::Stdio};
use log;
use tempfile::NamedTempFile;
use tokio::{
fs::File,
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
process::Command,
sync::{OnceCell, mpsc},
};
fn process_log_line(line: impl AsRef<str>) {
let line = line.as_ref();
let prefix_len = "2025.05.07 20:25:31.905655 [ 3286299 ] {} ".len();
if line.len() > prefix_len {
match &line[prefix_len..] {
ln if ln.starts_with("<Information>") => {
let msg = &ln[14..];
let msg_trimmed = msg.trim();
if msg_trimmed.starts_with("(version ") {
return;
}
if !msg_trimmed.is_empty() {
log::info!("{}", msg)
}
}
ln if ln.starts_with("<Trace>") => log::trace!("{}", &ln[8..]),
ln if ln.starts_with("<Error>") => log::error!("{}", &ln[8..]),
ln if ln.starts_with("<Debug>") => log::debug!("{}", &ln[8..]),
ln if ln.starts_with("<Warning>") => log::warn!("{}", &ln[10..]),
_ => log::debug!("{}", line),
}
} else if !line.trim().is_empty() {
let t = line.trim();
if t.starts_with("(version ") {
return;
}
log::info!("{}", line);
}
}
static CLICKHOUSE_PROCESS: OnceCell<u32> = OnceCell::const_new();
include!(concat!(env!("OUT_DIR"), "/embed_clickhouse.rs"));
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ClickhouseError {
Process(String),
InitializationFailed,
}
impl std::fmt::Display for ClickhouseError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ClickhouseError::Process(msg) => write!(f, "ClickHouse error: {}", msg),
ClickhouseError::InitializationFailed => {
write!(f, "ClickHouse initialization failed")
}
}
}
}
impl std::error::Error for ClickhouseError {}
pub type ClickhouseProcessFuture = Pin<Box<dyn Future<Output = Result<(), ()>> + Send>>;
pub type ClickhouseStartResult = (mpsc::Receiver<()>, ClickhouseProcessFuture);
pub async fn start_client() -> Result<(), Box<dyn std::error::Error>> {
let clickhouse_path = NamedTempFile::with_suffix("-clickhouse")
.unwrap()
.into_temp_path()
.keep()
.unwrap();
log::info!("Writing ClickHouse binary to: {:?}", clickhouse_path);
File::create(&clickhouse_path)
.await
.unwrap()
.write_all(CLICKHOUSE_BINARY)
.await
.unwrap();
#[cfg(unix)]
std::fs::set_permissions(&clickhouse_path, std::fs::Permissions::from_mode(0o755)).unwrap();
log::info!("ClickHouse binary written and permissions set.");
let bin_dir = Path::new("./bin");
std::fs::create_dir_all(bin_dir).unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
Command::new(clickhouse_path)
.arg("client")
.arg("--host=localhost")
.current_dir(bin_dir)
.stdout(Stdio::inherit())
.stderr(Stdio::inherit())
.spawn()
.expect("Failed to start ClickHouse client process")
.wait()
.await?;
Ok(())
}
pub async fn start() -> Result<ClickhouseStartResult, ClickhouseError> {
log::info!("Spawning local ClickHouse server...");
let clickhouse_path = NamedTempFile::with_suffix("-clickhouse")
.unwrap()
.into_temp_path()
.keep()
.unwrap();
log::info!("Writing ClickHouse binary to: {:?}", clickhouse_path);
File::create(&clickhouse_path)
.await
.unwrap()
.write_all(CLICKHOUSE_BINARY)
.await
.unwrap();
#[cfg(unix)]
std::fs::set_permissions(&clickhouse_path, std::fs::Permissions::from_mode(0o755)).unwrap();
log::info!("ClickHouse binary written and permissions set.");
let (ready_tx, ready_rx) = mpsc::channel(1);
let bin_dir = Path::new("./bin");
std::fs::create_dir_all(bin_dir).unwrap();
std::thread::sleep(std::time::Duration::from_secs(1));
let mut clickhouse_command = unsafe {
Command::new(clickhouse_path)
.arg("server")
.stdout(Stdio::piped()) .stderr(Stdio::piped()) .current_dir(bin_dir)
.pre_exec(|| {
libc::setsid();
Ok(())
})
.spawn()
.map_err(|err| {
ClickhouseError::Process(format!("Failed to start the ClickHouse process: {}", err))
})?
};
let stdout = clickhouse_command
.stdout
.take()
.expect("Failed to capture stdout");
let stderr = clickhouse_command
.stderr
.take()
.expect("Failed to capture stderr");
let mut stdout_reader = BufReader::new(stdout).lines();
let mut stderr_reader = BufReader::new(stderr).lines();
tokio::spawn(async move {
let mut ready_signal_sent = false;
let mut other_pid: Option<u32> = None;
loop {
tokio::select! {
line = stdout_reader.next_line() => {
if let Ok(Some(line)) = line {
process_log_line(line);
}
}
line = stderr_reader.next_line() => {
if let Ok(Some(line)) = line {
if line.ends_with("Updating DNS cache") || line.ends_with("Updated DNS cache") {
continue;
}
process_log_line(&line);
if !ready_signal_sent && line.contains("Ready for connections") {
log::info!("ClickHouse is ready to accept connections.");
if let Err(err) = ready_tx.send(()).await {
log::error!("Failed to send readiness signal: {}", err);
}
ready_signal_sent = true;
} else if line.contains("DB::Server::run() @") {
log::warn!("ClickHouse server is already running, gracefully shutting down and restarting.");
let Some(other_pid) = other_pid else {
panic!("Failed to find the PID of the running ClickHouse server.");
};
if let Err(err) = Command::new("kill")
.arg("-s")
.arg("SIGTERM")
.arg(other_pid.to_string())
.status()
.await
{
log::error!("Failed to send SIGTERM to ClickHouse process: {}", err);
}
log::warn!("ClickHouse process with PID {} killed.", other_pid);
log::warn!("Please re-launch.");
std::process::exit(0);
} else if line.contains("PID: ")
&& let Some(pid_str) = line.split_whitespace().nth(1)
&& let Ok(pid) = pid_str.parse::<u32>() {
other_pid = Some(pid);
}
}
}
}
}
});
log::info!("Waiting for ClickHouse process to be ready.");
Ok((
ready_rx,
Box::pin(async move {
CLICKHOUSE_PROCESS
.set(clickhouse_command.id().unwrap())
.unwrap();
let status = clickhouse_command.wait().await;
match status {
Ok(status) => {
log::info!("ClickHouse exited with status: {}", status);
Ok(())
}
Err(err) => {
log::error!("Failed to wait on the ClickHouse process: {}", err);
Err(())
}
}
}),
))
}
pub async fn stop() {
if let Some(&pid) = CLICKHOUSE_PROCESS.get() {
log::info!("Stopping ClickHouse process with PID: {}", pid);
let status = Command::new("kill").arg(pid.to_string()).status();
match status.await {
Ok(exit_status) if exit_status.success() => {
log::info!("ClickHouse process with PID {} stopped gracefully.", pid);
}
Ok(exit_status) => {
log::warn!(
"pkill executed, but ClickHouse process might not have stopped. Exit status: {}",
exit_status
);
}
Err(err) => {
log::error!("Failed to execute pkill for PID {}: {}", pid, err);
}
}
} else {
log::warn!("ClickHouse process PID not found in CLICKHOUSE_PROCESS.");
}
}
pub fn stop_sync() {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(stop());
}