Skip to main content

jetstreamer_utils/
clickhouse.rs

1use std::{future::Future, os::unix::fs::PermissionsExt, path::Path, pin::Pin, process::Stdio};
2
3use log;
4use tempfile::NamedTempFile;
5use tokio::{
6    fs::File,
7    io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
8    process::Command,
9    sync::{OnceCell, mpsc},
10};
11
12fn process_log_line(line: impl AsRef<str>) {
13    let line = line.as_ref();
14    let prefix_len = "2025.05.07 20:25:31.905655 [ 3286299 ] {} ".len();
15    if line.len() > prefix_len {
16        match &line[prefix_len..] {
17            ln if ln.starts_with("<Information>") => {
18                let msg = &ln[14..];
19                let msg_trimmed = msg.trim();
20                // Suppress noisy ClickHouse client version banner lines
21                if msg_trimmed.starts_with("(version ") {
22                    return;
23                }
24                if !msg_trimmed.is_empty() {
25                    log::info!("{}", msg)
26                }
27            }
28            ln if ln.starts_with("<Trace>") => log::trace!("{}", &ln[8..]),
29            ln if ln.starts_with("<Error>") => log::error!("{}", &ln[8..]),
30            ln if ln.starts_with("<Debug>") => log::debug!("{}", &ln[8..]),
31            ln if ln.starts_with("<Warning>") => log::warn!("{}", &ln[10..]),
32            _ => log::debug!("{}", line),
33        }
34    } else if !line.trim().is_empty() {
35        let t = line.trim();
36        // Suppress bare version banner lines that sometimes arrive without the standard prefix
37        if t.starts_with("(version ") {
38            return;
39        }
40        log::info!("{}", line);
41    }
42}
43
44static CLICKHOUSE_PROCESS: OnceCell<u32> = OnceCell::const_new();
45
46include!(concat!(env!("OUT_DIR"), "/embed_clickhouse.rs")); // raw bytes for clickhouse binary
47
48/// Errors that can occur when managing the embedded ClickHouse process.
49#[derive(Debug, Clone, PartialEq, Eq)]
50pub enum ClickhouseError {
51    /// ClickHouse process terminated with an error message.
52    Process(String),
53    /// Server failed to perform its required initialization steps.
54    InitializationFailed,
55}
56
57impl std::fmt::Display for ClickhouseError {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        match self {
60            ClickhouseError::Process(msg) => write!(f, "ClickHouse error: {}", msg),
61            ClickhouseError::InitializationFailed => {
62                write!(f, "ClickHouse initialization failed")
63            }
64        }
65    }
66}
67
68impl std::error::Error for ClickhouseError {}
69
70/// Future type returned when supervising the ClickHouse process.
71pub type ClickhouseProcessFuture = Pin<Box<dyn Future<Output = Result<(), ()>> + Send>>;
72/// Tuple containing the readiness channel and process future returned by [`start`].
73pub type ClickhouseStartResult = (mpsc::Receiver<()>, ClickhouseProcessFuture);
74
75/// Launches the bundled ClickHouse client binary and forwards STDIO.
76pub async fn start_client() -> Result<(), Box<dyn std::error::Error>> {
77    let clickhouse_path = NamedTempFile::with_suffix("-clickhouse")
78        .unwrap()
79        .into_temp_path()
80        .keep()
81        .unwrap();
82    log::info!("Writing ClickHouse binary to: {:?}", clickhouse_path);
83    File::create(&clickhouse_path)
84        .await
85        .unwrap()
86        .write_all(CLICKHOUSE_BINARY)
87        .await
88        .unwrap();
89    // executable permission for Unix
90    #[cfg(unix)]
91    std::fs::set_permissions(&clickhouse_path, std::fs::Permissions::from_mode(0o755)).unwrap();
92    log::info!("ClickHouse binary written and permissions set.");
93
94    let bin_dir = Path::new("./bin");
95    std::fs::create_dir_all(bin_dir).unwrap();
96
97    std::thread::sleep(std::time::Duration::from_secs(1));
98
99    // let clickhouse take over the current process
100    Command::new(clickhouse_path)
101        .arg("client")
102        .arg("--host=localhost")
103        .current_dir(bin_dir)
104        .stdout(Stdio::inherit())
105        .stderr(Stdio::inherit())
106        .spawn()
107        .expect("Failed to start ClickHouse client process")
108        .wait()
109        .await?;
110
111    Ok(())
112}
113
114/// Spawns the embedded ClickHouse server and returns a readiness channel plus process task.
115pub async fn start() -> Result<ClickhouseStartResult, ClickhouseError> {
116    log::info!("Spawning local ClickHouse server...");
117
118    // write clickhouse binary to a temp file
119    let clickhouse_path = NamedTempFile::with_suffix("-clickhouse")
120        .unwrap()
121        .into_temp_path()
122        .keep()
123        .unwrap();
124    log::info!("Writing ClickHouse binary to: {:?}", clickhouse_path);
125    File::create(&clickhouse_path)
126        .await
127        .unwrap()
128        .write_all(CLICKHOUSE_BINARY)
129        .await
130        .unwrap();
131    // executable permission for Unix
132    #[cfg(unix)]
133    std::fs::set_permissions(&clickhouse_path, std::fs::Permissions::from_mode(0o755)).unwrap();
134    log::info!("ClickHouse binary written and permissions set.");
135
136    // Create a channel to signal when ClickHouse is ready
137    let (ready_tx, ready_rx) = mpsc::channel(1);
138
139    let bin_dir = Path::new("./bin");
140    std::fs::create_dir_all(bin_dir).unwrap();
141    std::thread::sleep(std::time::Duration::from_secs(1));
142    let mut clickhouse_command = unsafe {
143        Command::new(clickhouse_path)
144            .arg("server")
145            //.arg("--async_insert_queue_flush_on_shutdown=1")
146            // NOTE: leaving ClickHouse at its default `trace` log level. Lower levels
147            // (`information`, `warning`) suppress the "Ready for connections" banner that the
148            // readiness scanner below looks for, so the firehose hangs at startup. The
149            // AsyncLogMessageQueue overflow warnings under high-throughput async inserts are
150            // noise from this trace verbosity but are not a correctness issue.
151            .stdout(Stdio::piped()) // Redirect stdout to capture logs
152            .stderr(Stdio::piped()) // Also capture stderr
153            .current_dir(bin_dir)
154            .pre_exec(|| {
155                // safety: setsid() can't fail if we're child of a real process
156                libc::setsid();
157                Ok(())
158            })
159            .spawn()
160            .map_err(|err| {
161                ClickhouseError::Process(format!("Failed to start the ClickHouse process: {}", err))
162            })?
163    };
164
165    // Capture stdout and stderr
166    let stdout = clickhouse_command
167        .stdout
168        .take()
169        .expect("Failed to capture stdout");
170    let stderr = clickhouse_command
171        .stderr
172        .take()
173        .expect("Failed to capture stderr");
174
175    // Create a combined reader for stdout and stderr
176    let mut stdout_reader = BufReader::new(stdout).lines();
177    let mut stderr_reader = BufReader::new(stderr).lines();
178
179    // Spawn a task to monitor both stdout and stderr for the "Ready for connections." message
180    tokio::spawn(async move {
181        let mut ready_signal_sent = false;
182        let mut other_pid: Option<u32> = None;
183        loop {
184            tokio::select! {
185                line = stdout_reader.next_line() => {
186                    if let Ok(Some(line)) = line {
187                        process_log_line(line);
188                    }
189                }
190                line = stderr_reader.next_line() => {
191                    if let Ok(Some(line)) = line {
192                        if line.ends_with("Updating DNS cache") || line.ends_with("Updated DNS cache") {
193                            // Ignore DNS cache update messages
194                            continue;
195                        }
196                        process_log_line(&line);
197
198                        // Check for "Ready for connections" message, ignoring extra formatting or invisible chars
199                        if !ready_signal_sent && line.contains("Ready for connections") {
200                            log::info!("ClickHouse is ready to accept connections.");
201
202                            // Send the readiness signal through the channel
203                            if let Err(err) = ready_tx.send(()).await {
204                                log::error!("Failed to send readiness signal: {}", err);
205                            }
206                            ready_signal_sent = true;
207                        } else if line.contains("DB::Server::run() @") {
208                            log::warn!("ClickHouse server is already running, gracefully shutting down and restarting.");
209                            let Some(other_pid) = other_pid else {
210                                panic!("Failed to find the PID of the running ClickHouse server.");
211                            };
212                            if let Err(err) = Command::new("kill")
213                                .arg("-s")
214                                .arg("SIGTERM")
215                                .arg(other_pid.to_string())
216                                .status()
217                                .await
218                            {
219                                log::error!("Failed to send SIGTERM to ClickHouse process: {}", err);
220                            }
221                            log::warn!("ClickHouse process with PID {} killed.", other_pid);
222                            log::warn!("Please re-launch.");
223                            std::process::exit(0);
224                        } else if line.contains("PID: ")
225                            && let Some(pid_str) = line.split_whitespace().nth(1)
226                                && let Ok(pid) = pid_str.parse::<u32>() {
227                                    other_pid = Some(pid);
228                                }
229                    }
230                }
231            }
232        }
233    });
234
235    log::info!("Waiting for ClickHouse process to be ready.");
236
237    // Return the receiver side of the channel and the future for the ClickHouse process
238    Ok((
239        ready_rx,
240        Box::pin(async move {
241            CLICKHOUSE_PROCESS
242                .set(clickhouse_command.id().unwrap())
243                .unwrap();
244            let status = clickhouse_command.wait().await;
245
246            match status {
247                Ok(status) => {
248                    log::info!("ClickHouse exited with status: {}", status);
249                    Ok(())
250                }
251                Err(err) => {
252                    log::error!("Failed to wait on the ClickHouse process: {}", err);
253                    Err(())
254                }
255            }
256        }),
257    ))
258}
259
260/// Stops the ClickHouse process asynchronously, if one is running.
261pub async fn stop() {
262    if let Some(&pid) = CLICKHOUSE_PROCESS.get() {
263        log::info!("Stopping ClickHouse process with PID: {}", pid);
264
265        let status = Command::new("kill").arg(pid.to_string()).status();
266
267        match status.await {
268            Ok(exit_status) if exit_status.success() => {
269                log::info!("ClickHouse process with PID {} stopped gracefully.", pid);
270            }
271            Ok(exit_status) => {
272                log::warn!(
273                    "pkill executed, but ClickHouse process might not have stopped. Exit status: {}",
274                    exit_status
275                );
276            }
277            Err(err) => {
278                log::error!("Failed to execute pkill for PID {}: {}", pid, err);
279            }
280        }
281    } else {
282        log::warn!("ClickHouse process PID not found in CLICKHOUSE_PROCESS.");
283    }
284}
285
286/// Synchronously stops the ClickHouse process by blocking on [`stop`].
287pub fn stop_sync() {
288    tokio::runtime::Builder::new_current_thread()
289        .enable_all()
290        .build()
291        .unwrap()
292        .block_on(stop());
293}