jetstreamer_utils/
clickhouse.rs

1use std::{future::Future, os::unix::fs::PermissionsExt, path::Path, pin::Pin, process::Stdio};
2
3use log;
4use tempfile::NamedTempFile;
5use tokio::{
6    fs::File,
7    io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
8    process::Command,
9    sync::{OnceCell, mpsc},
10};
11
12fn process_log_line(line: impl AsRef<str>) {
13    let line = line.as_ref();
14    let prefix_len = "2025.05.07 20:25:31.905655 [ 3286299 ] {} ".len();
15    if line.len() > prefix_len {
16        match &line[prefix_len..] {
17            ln if ln.starts_with("<Information>") => {
18                let msg = &ln[14..];
19                let msg_trimmed = msg.trim();
20                // Suppress noisy ClickHouse client version banner lines
21                if msg_trimmed.starts_with("(version ") {
22                    return;
23                }
24                if !msg_trimmed.is_empty() {
25                    log::info!("{}", msg)
26                }
27            }
28            ln if ln.starts_with("<Trace>") => log::trace!("{}", &ln[8..]),
29            ln if ln.starts_with("<Error>") => log::error!("{}", &ln[8..]),
30            ln if ln.starts_with("<Debug>") => log::debug!("{}", &ln[8..]),
31            ln if ln.starts_with("<Warning>") => log::warn!("{}", &ln[10..]),
32            _ => log::debug!("{}", line),
33        }
34    } else if !line.trim().is_empty() {
35        let t = line.trim();
36        // Suppress bare version banner lines that sometimes arrive without the standard prefix
37        if t.starts_with("(version ") {
38            return;
39        }
40        log::info!("{}", line);
41    }
42}
43
44static CLICKHOUSE_PROCESS: OnceCell<u32> = OnceCell::const_new();
45
46include!(concat!(env!("OUT_DIR"), "/embed_clickhouse.rs")); // raw bytes for clickhouse binary
47
48/// Errors that can occur when managing the embedded ClickHouse process.
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum ClickhouseError {
51    /// ClickHouse process terminated with an error message.
52    Process(String),
53    /// Server failed to perform its required initialization steps.
54    InitializationFailed,
55}
56
57impl std::fmt::Display for ClickhouseError {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        match self {
60            ClickhouseError::Process(msg) => write!(f, "ClickHouse error: {}", msg),
61            ClickhouseError::InitializationFailed => {
62                write!(f, "ClickHouse initialization failed")
63            }
64        }
65    }
66}
67
68impl std::error::Error for ClickhouseError {}
69
70/// Future type returned when supervising the ClickHouse process.
71pub type ClickhouseProcessFuture = Pin<Box<dyn Future<Output = Result<(), ()>> + Send>>;
72/// Tuple containing the readiness channel and process future returned by [`start`].
73pub type ClickhouseStartResult = (mpsc::Receiver<()>, ClickhouseProcessFuture);
74
75/// Launches the bundled ClickHouse client binary and forwards STDIO.
76pub async fn start_client() -> Result<(), Box<dyn std::error::Error>> {
77    let clickhouse_path = NamedTempFile::with_suffix("-clickhouse")
78        .unwrap()
79        .into_temp_path()
80        .keep()
81        .unwrap();
82    log::info!("Writing ClickHouse binary to: {:?}", clickhouse_path);
83    File::create(&clickhouse_path)
84        .await
85        .unwrap()
86        .write_all(CLICKHOUSE_BINARY)
87        .await
88        .unwrap();
89    // executable permission for Unix
90    #[cfg(unix)]
91    std::fs::set_permissions(&clickhouse_path, std::fs::Permissions::from_mode(0o755)).unwrap();
92    log::info!("ClickHouse binary written and permissions set.");
93
94    let bin_dir = Path::new("./bin");
95    std::fs::create_dir_all(bin_dir).unwrap();
96
97    std::thread::sleep(std::time::Duration::from_secs(1));
98
99    // let clickhouse take over the current process
100    Command::new(clickhouse_path)
101        .arg("client")
102        .arg("--host=localhost")
103        .current_dir(bin_dir)
104        .stdout(Stdio::inherit())
105        .stderr(Stdio::inherit())
106        .spawn()
107        .expect("Failed to start ClickHouse client process")
108        .wait()
109        .await?;
110
111    Ok(())
112}
113
114/// Spawns the embedded ClickHouse server and returns a readiness channel plus process task.
115pub async fn start() -> Result<ClickhouseStartResult, ClickhouseError> {
116    log::info!("Spawning local ClickHouse server...");
117
118    // write clickhouse binary to a temp file
119    let clickhouse_path = NamedTempFile::with_suffix("-clickhouse")
120        .unwrap()
121        .into_temp_path()
122        .keep()
123        .unwrap();
124    log::info!("Writing ClickHouse binary to: {:?}", clickhouse_path);
125    File::create(&clickhouse_path)
126        .await
127        .unwrap()
128        .write_all(CLICKHOUSE_BINARY)
129        .await
130        .unwrap();
131    // executable permission for Unix
132    #[cfg(unix)]
133    std::fs::set_permissions(&clickhouse_path, std::fs::Permissions::from_mode(0o755)).unwrap();
134    log::info!("ClickHouse binary written and permissions set.");
135
136    // Create a channel to signal when ClickHouse is ready
137    let (ready_tx, ready_rx) = mpsc::channel(1);
138
139    let bin_dir = Path::new("./bin");
140    std::fs::create_dir_all(bin_dir).unwrap();
141    std::thread::sleep(std::time::Duration::from_secs(1));
142    let mut clickhouse_command = unsafe {
143        Command::new(clickhouse_path)
144            .arg("server")
145            //.arg("--async_insert_queue_flush_on_shutdown=1")
146            .stdout(Stdio::piped()) // Redirect stdout to capture logs
147            .stderr(Stdio::piped()) // Also capture stderr
148            .current_dir(bin_dir)
149            .pre_exec(|| {
150                // safety: setsid() can't fail if we're child of a real process
151                libc::setsid();
152                Ok(())
153            })
154            .spawn()
155            .map_err(|err| {
156                ClickhouseError::Process(format!("Failed to start the ClickHouse process: {}", err))
157            })?
158    };
159
160    // Capture stdout and stderr
161    let stdout = clickhouse_command
162        .stdout
163        .take()
164        .expect("Failed to capture stdout");
165    let stderr = clickhouse_command
166        .stderr
167        .take()
168        .expect("Failed to capture stderr");
169
170    // Create a combined reader for stdout and stderr
171    let mut stdout_reader = BufReader::new(stdout).lines();
172    let mut stderr_reader = BufReader::new(stderr).lines();
173
174    // Spawn a task to monitor both stdout and stderr for the "Ready for connections." message
175    tokio::spawn(async move {
176        let mut ready_signal_sent = false;
177        let mut other_pid: Option<u32> = None;
178        loop {
179            tokio::select! {
180                line = stdout_reader.next_line() => {
181                    if let Ok(Some(line)) = line {
182                        process_log_line(line);
183                    }
184                }
185                line = stderr_reader.next_line() => {
186                    if let Ok(Some(line)) = line {
187                        if line.ends_with("Updating DNS cache") || line.ends_with("Updated DNS cache") {
188                            // Ignore DNS cache update messages
189                            continue;
190                        }
191                        process_log_line(&line);
192
193                        // Check for "Ready for connections" message, ignoring extra formatting or invisible chars
194                        if !ready_signal_sent && line.contains("Ready for connections") {
195                            log::info!("ClickHouse is ready to accept connections.");
196
197                            // Send the readiness signal through the channel
198                            if let Err(err) = ready_tx.send(()).await {
199                                log::error!("Failed to send readiness signal: {}", err);
200                            }
201                            ready_signal_sent = true;
202                        } else if line.contains("DB::Server::run() @") {
203                            log::warn!("ClickHouse server is already running, gracefully shutting down and restarting.");
204                            let Some(other_pid) = other_pid else {
205                                panic!("Failed to find the PID of the running ClickHouse server.");
206                            };
207                            if let Err(err) = Command::new("kill")
208                                .arg("-s")
209                                .arg("SIGTERM")
210                                .arg(other_pid.to_string())
211                                .status()
212                                .await
213                            {
214                                log::error!("Failed to send SIGTERM to ClickHouse process: {}", err);
215                            }
216                            log::warn!("ClickHouse process with PID {} killed.", other_pid);
217                            log::warn!("Please re-launch.");
218                            std::process::exit(0);
219                        } else if line.contains("PID: ")
220                            && let Some(pid_str) = line.split_whitespace().nth(1)
221                                && let Ok(pid) = pid_str.parse::<u32>() {
222                                    other_pid = Some(pid);
223                                }
224                    }
225                }
226            }
227        }
228    });
229
230    log::info!("Waiting for ClickHouse process to be ready.");
231
232    // Return the receiver side of the channel and the future for the ClickHouse process
233    Ok((
234        ready_rx,
235        Box::pin(async move {
236            CLICKHOUSE_PROCESS
237                .set(clickhouse_command.id().unwrap())
238                .unwrap();
239            let status = clickhouse_command.wait().await;
240
241            match status {
242                Ok(status) => {
243                    log::info!("ClickHouse exited with status: {}", status);
244                    Ok(())
245                }
246                Err(err) => {
247                    log::error!("Failed to wait on the ClickHouse process: {}", err);
248                    Err(())
249                }
250            }
251        }),
252    ))
253}
254
255/// Stops the ClickHouse process asynchronously, if one is running.
256pub async fn stop() {
257    if let Some(&pid) = CLICKHOUSE_PROCESS.get() {
258        log::info!("Stopping ClickHouse process with PID: {}", pid);
259
260        let status = Command::new("kill").arg(pid.to_string()).status();
261
262        match status.await {
263            Ok(exit_status) if exit_status.success() => {
264                log::info!("ClickHouse process with PID {} stopped gracefully.", pid);
265            }
266            Ok(exit_status) => {
267                log::warn!(
268                    "pkill executed, but ClickHouse process might not have stopped. Exit status: {}",
269                    exit_status
270                );
271            }
272            Err(err) => {
273                log::error!("Failed to execute pkill for PID {}: {}", pid, err);
274            }
275        }
276    } else {
277        log::warn!("ClickHouse process PID not found in CLICKHOUSE_PROCESS.");
278    }
279}
280
281/// Synchronously stops the ClickHouse process by blocking on [`stop`].
282pub fn stop_sync() {
283    tokio::runtime::Builder::new_current_thread()
284        .enable_all()
285        .build()
286        .unwrap()
287        .block_on(stop());
288}