rustenium_core/
process.rs1use std::process::Stdio;
2
3use regex::Regex;
4use tokio::io::{AsyncBufReadExt, BufReader};
5use tokio::process::{Child, Command};
6use tokio::time::{Duration, timeout};
7
8#[derive(Debug)]
9pub struct Process {
10 child: Option<Child>,
11}
12
13impl Process {
14 fn from_command(exe: &str, mut cmd: Command) -> Process {
15 let mut child = cmd
16 .stdout(Stdio::piped())
17 .stderr(Stdio::piped())
18 .kill_on_drop(true)
19 .spawn()
20 .expect("Failed to start process");
21
22 if let Some(stdout) = child.stdout.take() {
23 let exe_name = exe.to_string();
24 tokio::spawn(async move {
25 let mut reader = BufReader::new(stdout);
26 let mut line = String::new();
27 loop {
28 line.clear();
29 match reader.read_line(&mut line).await {
30 Ok(0) => break,
31 Ok(_) => { tracing::debug!("[{} stdout] {}", exe_name, line.trim()); }
32 Err(e) => { tracing::error!("[{} stdout] Error reading: {}", exe_name, e); break; }
33 }
34 }
35 });
36 }
37
38 if let Some(stderr) = child.stderr.take() {
39 let exe_name = exe.to_string();
40 tokio::spawn(async move {
41 let mut reader = BufReader::new(stderr);
42 let mut line = String::new();
43 loop {
44 line.clear();
45 match reader.read_line(&mut line).await {
46 Ok(0) => break,
47 Ok(_) => { tracing::debug!("[{} stderr] {}", exe_name, line.trim()); }
48 Err(e) => { tracing::error!("[{} stderr] Error reading: {}", exe_name, e); break; }
49 }
50 }
51 });
52 }
53
54 Self { child: Some(child) }
55 }
56
57 pub fn create<S, I>(exe_path: S, args: I) -> Process
58 where
59 S: AsRef<str>,
60 I: IntoIterator<Item = String>,
61 {
62 let exe = exe_path.as_ref();
63 let mut cmd = Command::new(exe);
64 let args = args.into_iter().collect::<Vec<_>>();
65 tracing::info!("Starting process: '{}', args: {:?}", exe, args);
66 cmd.args(args);
67 Self::from_command(exe, cmd)
68 }
69
70 pub fn create_with_env<S, I>(exe_path: S, args: I, env: impl IntoIterator<Item = (String, String)>) -> Process
71 where
72 S: AsRef<str>,
73 I: IntoIterator<Item = String>,
74 {
75 let exe = exe_path.as_ref();
76 let mut cmd = Command::new(exe);
77 cmd.args(args.into_iter().collect::<Vec<_>>()).envs(env);
78 Self::from_command(exe, cmd)
79 }
80
81 #[deprecated]
82 pub async fn wait_for_pattern(&mut self, pattern: &str, timeout_secs: Option<u64>) -> String {
83 let timeout_secs = timeout_secs.unwrap_or(20);
84 let regex = Regex::new(pattern).expect("Invalid regex pattern");
85 let child = self.child.as_mut().unwrap();
86
87 let stdout = child.stdout.as_mut().expect("Failed to access stdout");
88 let stderr = child.stderr.as_mut().expect("Failed to access stderr");
89
90 let mut stdout_lines = BufReader::new(stdout).lines();
91 let mut stderr_lines = BufReader::new(stderr).lines();
92
93 let check_line = |_label: &str, line: Result<Option<String>, _>| -> Option<String> {
94 if let Ok(Some(line)) = line {
95 if let Some(captures) = regex.captures(&line) {
96 if let Some(url) = captures.get(1) {
97 return Some(url.as_str().into());
98 }
99 }
100 }
101 None
102 };
103
104 let timeout_duration = Duration::from_secs(timeout_secs);
105
106 let timeout_result = timeout(timeout_duration, async {
107 loop {
108 tokio::select! {
109 stdout_line = stdout_lines.next_line() => {
110 if let Some(line) = check_line("stdout", stdout_line) {
111 return Some(line);
112 }
113 },
114 stderr_line = stderr_lines.next_line() => {
115 if let Some(line) = check_line("stderr", stderr_line) {
116 return Some(line);
117 }
118 }
119 }
120 }
121 })
122 .await;
123
124 match timeout_result {
125 Ok(Some(matched)) => matched,
126 Ok(None) => panic!("Found a pattern but None"),
127 Err(_) => panic!("Timeout reached without finding pattern"),
128 }
129 }
130}
131
132impl Process {
133 pub fn kill(&mut self) -> Result<(), crate::error::ProcessKillError> {
134 if let Some(mut child) = self.child.take() {
135 if let Some(pid) = child.id() {
136 let pid_str = pid.to_string();
137 tracing::debug!("[Process]: Killing process, PID: {}", pid_str);
138
139 #[cfg(unix)]
140 {
141 match std::process::Command::new("pkill")
142 .args(["-9", "-P", &pid_str])
143 .output()
144 {
145 Ok(output) => {
146 tracing::debug!(
147 "[Process]: pkill stdout: {}",
148 String::from_utf8_lossy(&output.stdout)
149 );
150 tracing::debug!(
151 "[Process]: pkill stderr: {}",
152 String::from_utf8_lossy(&output.stderr)
153 );
154 }
155 Err(e) => {
156 tracing::error!("[Process]: Failed to execute pkill: {}", e);
157 }
158 }
159 }
160
161 #[cfg(windows)]
162 {
163 match std::process::Command::new("taskkill")
164 .args(["/F", "/T", "/PID", &pid_str])
165 .output()
166 {
167 Ok(output) => {
168 tracing::debug!(
169 "[Process]: taskkill stdout: {}",
170 String::from_utf8_lossy(&output.stdout)
171 );
172 tracing::debug!(
173 "[Process]: taskkill stderr: {}",
174 String::from_utf8_lossy(&output.stderr)
175 );
176 }
177 Err(e) => {
178 tracing::error!("[Process]: Failed to execute taskkill: {}", e);
179 }
180 }
181 }
182 }
183
184 child
185 .start_kill()
186 .map_err(|_| crate::error::ProcessKillError)?;
187
188 Ok(())
189 } else {
190 Err(crate::error::ProcessKillError)
191 }
192 }
193}
194
195impl Drop for Process {
196 fn drop(&mut self) {
197 let _ = self.kill();
198 }
199}
200
201pub fn kill_process_on_port(port: u16) {
204 tracing::debug!("[Process]: Killing process on port {}", port);
205
206 #[cfg(windows)]
207 {
208 let Ok(out) = std::process::Command::new("netstat").args(["-ano"]).output() else { return };
210 let stdout = String::from_utf8_lossy(&out.stdout);
211 let needle = format!(":{}", port);
212 for line in stdout.lines() {
213 if line.contains(&needle) && line.contains("LISTENING") {
214 if let Some(pid_str) = line.split_whitespace().last() {
215 tracing::debug!("[Process]: Found PID {} on port {}, killing", pid_str, port);
216 let _ = std::process::Command::new("taskkill")
217 .args(["/F", "/T", "/PID", pid_str])
218 .output();
219 }
220 }
221 }
222 }
223
224 #[cfg(unix)]
225 {
226 let _ = std::process::Command::new("fuser")
227 .args(["-k", &format!("{}/tcp", port)])
228 .output();
229 }
230}