jetstreamer_utils/
clickhouse.rs1use std::{future::Future, os::unix::fs::PermissionsExt, path::Path, pin::Pin, process::Stdio};
2
3use log;
4use tempfile::NamedTempFile;
5use tokio::{
6 fs::File,
7 io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
8 process::Command,
9 sync::{OnceCell, mpsc},
10};
11
12fn process_log_line(line: impl AsRef<str>) {
13 let line = line.as_ref();
14 let prefix_len = "2025.05.07 20:25:31.905655 [ 3286299 ] {} ".len();
15 if line.len() > prefix_len {
16 match &line[prefix_len..] {
17 ln if ln.starts_with("<Information>") => {
18 let msg = &ln[14..];
19 let msg_trimmed = msg.trim();
20 if msg_trimmed.starts_with("(version ") {
22 return;
23 }
24 if !msg_trimmed.is_empty() {
25 log::info!("{}", msg)
26 }
27 }
28 ln if ln.starts_with("<Trace>") => log::trace!("{}", &ln[8..]),
29 ln if ln.starts_with("<Error>") => log::error!("{}", &ln[8..]),
30 ln if ln.starts_with("<Debug>") => log::debug!("{}", &ln[8..]),
31 ln if ln.starts_with("<Warning>") => log::warn!("{}", &ln[10..]),
32 _ => log::debug!("{}", line),
33 }
34 } else if !line.trim().is_empty() {
35 let t = line.trim();
36 if t.starts_with("(version ") {
38 return;
39 }
40 log::info!("{}", line);
41 }
42}
43
44static CLICKHOUSE_PROCESS: OnceCell<u32> = OnceCell::const_new();
45
46include!(concat!(env!("OUT_DIR"), "/embed_clickhouse.rs")); #[derive(Debug, Clone, PartialEq, Eq)]
50pub enum ClickhouseError {
51 Process(String),
53 InitializationFailed,
55}
56
57impl std::fmt::Display for ClickhouseError {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 match self {
60 ClickhouseError::Process(msg) => write!(f, "ClickHouse error: {}", msg),
61 ClickhouseError::InitializationFailed => {
62 write!(f, "ClickHouse initialization failed")
63 }
64 }
65 }
66}
67
68impl std::error::Error for ClickhouseError {}
69
70pub type ClickhouseProcessFuture = Pin<Box<dyn Future<Output = Result<(), ()>> + Send>>;
72pub type ClickhouseStartResult = (mpsc::Receiver<()>, ClickhouseProcessFuture);
74
75pub async fn start_client() -> Result<(), Box<dyn std::error::Error>> {
77 let clickhouse_path = NamedTempFile::with_suffix("-clickhouse")
78 .unwrap()
79 .into_temp_path()
80 .keep()
81 .unwrap();
82 log::info!("Writing ClickHouse binary to: {:?}", clickhouse_path);
83 File::create(&clickhouse_path)
84 .await
85 .unwrap()
86 .write_all(CLICKHOUSE_BINARY)
87 .await
88 .unwrap();
89 #[cfg(unix)]
91 std::fs::set_permissions(&clickhouse_path, std::fs::Permissions::from_mode(0o755)).unwrap();
92 log::info!("ClickHouse binary written and permissions set.");
93
94 let bin_dir = Path::new("./bin");
95 std::fs::create_dir_all(bin_dir).unwrap();
96
97 std::thread::sleep(std::time::Duration::from_secs(1));
98
99 Command::new(clickhouse_path)
101 .arg("client")
102 .arg("--host=localhost")
103 .current_dir(bin_dir)
104 .stdout(Stdio::inherit())
105 .stderr(Stdio::inherit())
106 .spawn()
107 .expect("Failed to start ClickHouse client process")
108 .wait()
109 .await?;
110
111 Ok(())
112}
113
114pub async fn start() -> Result<ClickhouseStartResult, ClickhouseError> {
116 log::info!("Spawning local ClickHouse server...");
117
118 let clickhouse_path = NamedTempFile::with_suffix("-clickhouse")
120 .unwrap()
121 .into_temp_path()
122 .keep()
123 .unwrap();
124 log::info!("Writing ClickHouse binary to: {:?}", clickhouse_path);
125 File::create(&clickhouse_path)
126 .await
127 .unwrap()
128 .write_all(CLICKHOUSE_BINARY)
129 .await
130 .unwrap();
131 #[cfg(unix)]
133 std::fs::set_permissions(&clickhouse_path, std::fs::Permissions::from_mode(0o755)).unwrap();
134 log::info!("ClickHouse binary written and permissions set.");
135
136 let (ready_tx, ready_rx) = mpsc::channel(1);
138
139 let bin_dir = Path::new("./bin");
140 std::fs::create_dir_all(bin_dir).unwrap();
141 std::thread::sleep(std::time::Duration::from_secs(1));
142 let mut clickhouse_command = unsafe {
143 Command::new(clickhouse_path)
144 .arg("server")
145 .stdout(Stdio::piped()) .stderr(Stdio::piped()) .current_dir(bin_dir)
154 .pre_exec(|| {
155 libc::setsid();
157 Ok(())
158 })
159 .spawn()
160 .map_err(|err| {
161 ClickhouseError::Process(format!("Failed to start the ClickHouse process: {}", err))
162 })?
163 };
164
165 let stdout = clickhouse_command
167 .stdout
168 .take()
169 .expect("Failed to capture stdout");
170 let stderr = clickhouse_command
171 .stderr
172 .take()
173 .expect("Failed to capture stderr");
174
175 let mut stdout_reader = BufReader::new(stdout).lines();
177 let mut stderr_reader = BufReader::new(stderr).lines();
178
179 tokio::spawn(async move {
181 let mut ready_signal_sent = false;
182 let mut other_pid: Option<u32> = None;
183 loop {
184 tokio::select! {
185 line = stdout_reader.next_line() => {
186 if let Ok(Some(line)) = line {
187 process_log_line(line);
188 }
189 }
190 line = stderr_reader.next_line() => {
191 if let Ok(Some(line)) = line {
192 if line.ends_with("Updating DNS cache") || line.ends_with("Updated DNS cache") {
193 continue;
195 }
196 process_log_line(&line);
197
198 if !ready_signal_sent && line.contains("Ready for connections") {
200 log::info!("ClickHouse is ready to accept connections.");
201
202 if let Err(err) = ready_tx.send(()).await {
204 log::error!("Failed to send readiness signal: {}", err);
205 }
206 ready_signal_sent = true;
207 } else if line.contains("DB::Server::run() @") {
208 log::warn!("ClickHouse server is already running, gracefully shutting down and restarting.");
209 let Some(other_pid) = other_pid else {
210 panic!("Failed to find the PID of the running ClickHouse server.");
211 };
212 if let Err(err) = Command::new("kill")
213 .arg("-s")
214 .arg("SIGTERM")
215 .arg(other_pid.to_string())
216 .status()
217 .await
218 {
219 log::error!("Failed to send SIGTERM to ClickHouse process: {}", err);
220 }
221 log::warn!("ClickHouse process with PID {} killed.", other_pid);
222 log::warn!("Please re-launch.");
223 std::process::exit(0);
224 } else if line.contains("PID: ")
225 && let Some(pid_str) = line.split_whitespace().nth(1)
226 && let Ok(pid) = pid_str.parse::<u32>() {
227 other_pid = Some(pid);
228 }
229 }
230 }
231 }
232 }
233 });
234
235 log::info!("Waiting for ClickHouse process to be ready.");
236
237 Ok((
239 ready_rx,
240 Box::pin(async move {
241 CLICKHOUSE_PROCESS
242 .set(clickhouse_command.id().unwrap())
243 .unwrap();
244 let status = clickhouse_command.wait().await;
245
246 match status {
247 Ok(status) => {
248 log::info!("ClickHouse exited with status: {}", status);
249 Ok(())
250 }
251 Err(err) => {
252 log::error!("Failed to wait on the ClickHouse process: {}", err);
253 Err(())
254 }
255 }
256 }),
257 ))
258}
259
260pub async fn stop() {
262 if let Some(&pid) = CLICKHOUSE_PROCESS.get() {
263 log::info!("Stopping ClickHouse process with PID: {}", pid);
264
265 let status = Command::new("kill").arg(pid.to_string()).status();
266
267 match status.await {
268 Ok(exit_status) if exit_status.success() => {
269 log::info!("ClickHouse process with PID {} stopped gracefully.", pid);
270 }
271 Ok(exit_status) => {
272 log::warn!(
273 "pkill executed, but ClickHouse process might not have stopped. Exit status: {}",
274 exit_status
275 );
276 }
277 Err(err) => {
278 log::error!("Failed to execute pkill for PID {}: {}", pid, err);
279 }
280 }
281 } else {
282 log::warn!("ClickHouse process PID not found in CLICKHOUSE_PROCESS.");
283 }
284}
285
286pub fn stop_sync() {
288 tokio::runtime::Builder::new_current_thread()
289 .enable_all()
290 .build()
291 .unwrap()
292 .block_on(stop());
293}