jetstreamer_utils/
clickhouse.rs1use std::{future::Future, os::unix::fs::PermissionsExt, path::Path, pin::Pin, process::Stdio};
2
3use log;
4use tempfile::NamedTempFile;
5use tokio::{
6 fs::File,
7 io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
8 process::Command,
9 sync::{OnceCell, mpsc},
10};
11
12fn process_log_line(line: impl AsRef<str>) {
13 let line = line.as_ref();
14 let prefix_len = "2025.05.07 20:25:31.905655 [ 3286299 ] {} ".len();
15 if line.len() > prefix_len {
16 match &line[prefix_len..] {
17 ln if ln.starts_with("<Information>") => {
18 let msg = &ln[14..];
19 let msg_trimmed = msg.trim();
20 if msg_trimmed.starts_with("(version ") {
22 return;
23 }
24 if !msg_trimmed.is_empty() {
25 log::info!("{}", msg)
26 }
27 }
28 ln if ln.starts_with("<Trace>") => log::trace!("{}", &ln[8..]),
29 ln if ln.starts_with("<Error>") => log::error!("{}", &ln[8..]),
30 ln if ln.starts_with("<Debug>") => log::debug!("{}", &ln[8..]),
31 ln if ln.starts_with("<Warning>") => log::warn!("{}", &ln[10..]),
32 _ => log::debug!("{}", line),
33 }
34 } else if !line.trim().is_empty() {
35 let t = line.trim();
36 if t.starts_with("(version ") {
38 return;
39 }
40 log::info!("{}", line);
41 }
42}
43
44static CLICKHOUSE_PROCESS: OnceCell<u32> = OnceCell::const_new();
45
46include!(concat!(env!("OUT_DIR"), "/embed_clickhouse.rs")); #[derive(Debug, Clone, PartialEq, Eq)]
50pub enum ClickhouseError {
51 Process(String),
53 InitializationFailed,
55}
56
57impl std::fmt::Display for ClickhouseError {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 match self {
60 ClickhouseError::Process(msg) => write!(f, "ClickHouse error: {}", msg),
61 ClickhouseError::InitializationFailed => {
62 write!(f, "ClickHouse initialization failed")
63 }
64 }
65 }
66}
67
68impl std::error::Error for ClickhouseError {}
69
70pub type ClickhouseProcessFuture = Pin<Box<dyn Future<Output = Result<(), ()>> + Send>>;
72pub type ClickhouseStartResult = (mpsc::Receiver<()>, ClickhouseProcessFuture);
74
75pub async fn start_client() -> Result<(), Box<dyn std::error::Error>> {
77 let clickhouse_path = NamedTempFile::with_suffix("-clickhouse")
78 .unwrap()
79 .into_temp_path()
80 .keep()
81 .unwrap();
82 log::info!("Writing ClickHouse binary to: {:?}", clickhouse_path);
83 File::create(&clickhouse_path)
84 .await
85 .unwrap()
86 .write_all(CLICKHOUSE_BINARY)
87 .await
88 .unwrap();
89 #[cfg(unix)]
91 std::fs::set_permissions(&clickhouse_path, std::fs::Permissions::from_mode(0o755)).unwrap();
92 log::info!("ClickHouse binary written and permissions set.");
93
94 let bin_dir = Path::new("./bin");
95 std::fs::create_dir_all(bin_dir).unwrap();
96
97 std::thread::sleep(std::time::Duration::from_secs(1));
98
99 Command::new(clickhouse_path)
101 .arg("client")
102 .arg("--host=localhost")
103 .current_dir(bin_dir)
104 .stdout(Stdio::inherit())
105 .stderr(Stdio::inherit())
106 .spawn()
107 .expect("Failed to start ClickHouse client process")
108 .wait()
109 .await?;
110
111 Ok(())
112}
113
114pub async fn start() -> Result<ClickhouseStartResult, ClickhouseError> {
116 log::info!("Spawning local ClickHouse server...");
117
118 let clickhouse_path = NamedTempFile::with_suffix("-clickhouse")
120 .unwrap()
121 .into_temp_path()
122 .keep()
123 .unwrap();
124 log::info!("Writing ClickHouse binary to: {:?}", clickhouse_path);
125 File::create(&clickhouse_path)
126 .await
127 .unwrap()
128 .write_all(CLICKHOUSE_BINARY)
129 .await
130 .unwrap();
131 #[cfg(unix)]
133 std::fs::set_permissions(&clickhouse_path, std::fs::Permissions::from_mode(0o755)).unwrap();
134 log::info!("ClickHouse binary written and permissions set.");
135
136 let (ready_tx, ready_rx) = mpsc::channel(1);
138
139 let bin_dir = Path::new("./bin");
140 std::fs::create_dir_all(bin_dir).unwrap();
141 std::thread::sleep(std::time::Duration::from_secs(1));
142 let mut clickhouse_command = unsafe {
143 Command::new(clickhouse_path)
144 .arg("server")
145 .stdout(Stdio::piped()) .stderr(Stdio::piped()) .current_dir(bin_dir)
149 .pre_exec(|| {
150 libc::setsid();
152 Ok(())
153 })
154 .spawn()
155 .map_err(|err| {
156 ClickhouseError::Process(format!("Failed to start the ClickHouse process: {}", err))
157 })?
158 };
159
160 let stdout = clickhouse_command
162 .stdout
163 .take()
164 .expect("Failed to capture stdout");
165 let stderr = clickhouse_command
166 .stderr
167 .take()
168 .expect("Failed to capture stderr");
169
170 let mut stdout_reader = BufReader::new(stdout).lines();
172 let mut stderr_reader = BufReader::new(stderr).lines();
173
174 tokio::spawn(async move {
176 let mut ready_signal_sent = false;
177 let mut other_pid: Option<u32> = None;
178 loop {
179 tokio::select! {
180 line = stdout_reader.next_line() => {
181 if let Ok(Some(line)) = line {
182 process_log_line(line);
183 }
184 }
185 line = stderr_reader.next_line() => {
186 if let Ok(Some(line)) = line {
187 if line.ends_with("Updating DNS cache") || line.ends_with("Updated DNS cache") {
188 continue;
190 }
191 process_log_line(&line);
192
193 if !ready_signal_sent && line.contains("Ready for connections") {
195 log::info!("ClickHouse is ready to accept connections.");
196
197 if let Err(err) = ready_tx.send(()).await {
199 log::error!("Failed to send readiness signal: {}", err);
200 }
201 ready_signal_sent = true;
202 } else if line.contains("DB::Server::run() @") {
203 log::warn!("ClickHouse server is already running, gracefully shutting down and restarting.");
204 let Some(other_pid) = other_pid else {
205 panic!("Failed to find the PID of the running ClickHouse server.");
206 };
207 if let Err(err) = Command::new("kill")
208 .arg("-s")
209 .arg("SIGTERM")
210 .arg(other_pid.to_string())
211 .status()
212 .await
213 {
214 log::error!("Failed to send SIGTERM to ClickHouse process: {}", err);
215 }
216 log::warn!("ClickHouse process with PID {} killed.", other_pid);
217 log::warn!("Please re-launch.");
218 std::process::exit(0);
219 } else if line.contains("PID: ")
220 && let Some(pid_str) = line.split_whitespace().nth(1)
221 && let Ok(pid) = pid_str.parse::<u32>() {
222 other_pid = Some(pid);
223 }
224 }
225 }
226 }
227 }
228 });
229
230 log::info!("Waiting for ClickHouse process to be ready.");
231
232 Ok((
234 ready_rx,
235 Box::pin(async move {
236 CLICKHOUSE_PROCESS
237 .set(clickhouse_command.id().unwrap())
238 .unwrap();
239 let status = clickhouse_command.wait().await;
240
241 match status {
242 Ok(status) => {
243 log::info!("ClickHouse exited with status: {}", status);
244 Ok(())
245 }
246 Err(err) => {
247 log::error!("Failed to wait on the ClickHouse process: {}", err);
248 Err(())
249 }
250 }
251 }),
252 ))
253}
254
255pub async fn stop() {
257 if let Some(&pid) = CLICKHOUSE_PROCESS.get() {
258 log::info!("Stopping ClickHouse process with PID: {}", pid);
259
260 let status = Command::new("kill").arg(pid.to_string()).status();
261
262 match status.await {
263 Ok(exit_status) if exit_status.success() => {
264 log::info!("ClickHouse process with PID {} stopped gracefully.", pid);
265 }
266 Ok(exit_status) => {
267 log::warn!(
268 "pkill executed, but ClickHouse process might not have stopped. Exit status: {}",
269 exit_status
270 );
271 }
272 Err(err) => {
273 log::error!("Failed to execute pkill for PID {}: {}", pid, err);
274 }
275 }
276 } else {
277 log::warn!("ClickHouse process PID not found in CLICKHOUSE_PROCESS.");
278 }
279}
280
281pub fn stop_sync() {
283 tokio::runtime::Builder::new_current_thread()
284 .enable_all()
285 .build()
286 .unwrap()
287 .block_on(stop());
288}