1mod fake;
6
7pub use fake::FakeProcessRunner;
8
9pub use perfgate_error::AdapterError;
10use perfgate_sha256::sha256_hex;
11use std::path::{Path, PathBuf};
12use std::time::{Duration, Instant};
13
14#[cfg(windows)]
15use std::os::windows::io::AsRawHandle;
16
17#[derive(Debug, Clone, Default)]
19pub struct CommandSpec {
20 pub name: String,
21 pub cwd: Option<PathBuf>,
22 pub argv: Vec<String>,
23 pub env: Vec<(String, String)>,
24 pub timeout: Option<Duration>,
25 pub output_cap_bytes: usize,
26}
27
28#[derive(Debug, Clone, Default)]
30pub struct RunResult {
31 pub wall_ms: u64,
32 pub exit_code: i32,
33 pub timed_out: bool,
34 pub cpu_ms: Option<u64>,
37 pub page_faults: Option<u64>,
39 pub ctx_switches: Option<u64>,
41 pub max_rss_kb: Option<u64>,
44 pub io_read_bytes: Option<u64>,
46 pub io_write_bytes: Option<u64>,
48 pub network_packets: Option<u64>,
50 pub energy_uj: Option<u64>,
52 pub binary_bytes: Option<u64>,
54 pub stdout: Vec<u8>,
55 pub stderr: Vec<u8>,
56}
57
58pub trait ProcessRunner {
59 fn run(&self, spec: &CommandSpec) -> Result<RunResult, AdapterError>;
60}
61
62fn truncate(mut bytes: Vec<u8>, cap: usize) -> Vec<u8> {
64 if cap > 0 && bytes.len() > cap {
65 bytes.truncate(cap);
66 }
67 bytes
68}
69
70#[cfg(all(not(unix), not(windows)))]
71fn run_portable(spec: &CommandSpec) -> Result<RunResult, AdapterError> {
72 use std::process::Command;
73
74 let start = Instant::now();
75 let binary_bytes = binary_bytes_for_command(spec);
76 let mut cmd = Command::new(&spec.argv[0]);
77 if spec.argv.len() > 1 {
78 cmd.args(&spec.argv[1..]);
79 }
80
81 if let Some(cwd) = &spec.cwd {
82 cmd.current_dir(cwd);
83 }
84
85 for (k, v) in &spec.env {
86 cmd.env(k, v);
87 }
88
89 let out = cmd.output().map_err(|e| AdapterError::RunCommand {
90 command: spec.argv.join(" "),
91 reason: e.to_string(),
92 })?;
93
94 let wall_ms = start.elapsed().as_millis() as u64;
95 let exit_code = out.status.code().unwrap_or(-1);
96
97 Ok(RunResult {
98 wall_ms,
99 exit_code,
100 timed_out: false,
101 cpu_ms: None,
102 page_faults: None,
103 ctx_switches: None,
104 max_rss_kb: None,
105 io_read_bytes: None,
106 io_write_bytes: None,
107 network_packets: None,
108 energy_uj: None,
109 binary_bytes,
110 stdout: truncate(out.stdout, spec.output_cap_bytes),
111 stderr: truncate(out.stderr, spec.output_cap_bytes),
112 })
113}
114
115#[cfg(windows)]
116fn run_windows(spec: &CommandSpec) -> Result<RunResult, AdapterError> {
117 use std::process::{Command, Stdio};
118
119 let start = Instant::now();
120 let binary_bytes = binary_bytes_for_command(spec);
121
122 let mut cmd = Command::new(&spec.argv[0]);
123 if spec.argv.len() > 1 {
124 cmd.args(&spec.argv[1..]);
125 }
126
127 if let Some(cwd) = &spec.cwd {
128 cmd.current_dir(cwd);
129 }
130
131 for (k, v) in &spec.env {
132 cmd.env(k, v);
133 }
134
135 cmd.stdout(Stdio::piped());
136 cmd.stderr(Stdio::piped());
137
138 let mut child = cmd.spawn().map_err(|e| AdapterError::RunCommand {
139 command: spec.argv.join(" "),
140 reason: e.to_string(),
141 })?;
142
143 let exit_status = if let Some(timeout) = spec.timeout {
144 match child
145 .wait_timeout(timeout)
146 .map_err(|e| AdapterError::Other(e.to_string()))?
147 {
148 Some(status) => status,
149 None => {
150 child.kill().ok();
151 return Err(AdapterError::Timeout);
152 }
153 }
154 } else {
155 child
156 .wait()
157 .map_err(|e| AdapterError::Other(e.to_string()))?
158 };
159
160 let wall_ms = start.elapsed().as_millis() as u64;
161 let exit_code = exit_status.code().unwrap_or(-1);
162
163 let mut stdout_buf = Vec::new();
164 let mut stderr_buf = Vec::new();
165
166 let handle = child.as_raw_handle();
169 let max_rss_kb = get_max_rss_windows(handle);
170 let (io_read_bytes, io_write_bytes) = get_io_counters_windows(handle);
171
172 if let Some(mut stdout) = child.stdout.take() {
173 use std::io::Read;
174 stdout.read_to_end(&mut stdout_buf).ok();
175 }
176 if let Some(mut stderr) = child.stderr.take() {
177 use std::io::Read;
178 stderr.read_to_end(&mut stderr_buf).ok();
179 }
180
181 Ok(RunResult {
182 wall_ms,
183 exit_code,
184 timed_out: false,
185 cpu_ms: None,
186 page_faults: None,
187 ctx_switches: None,
188 max_rss_kb,
189 io_read_bytes,
190 io_write_bytes,
191 network_packets: None,
192 energy_uj: None,
193 binary_bytes,
194 stdout: truncate(stdout_buf, spec.output_cap_bytes),
195 stderr: truncate(stderr_buf, spec.output_cap_bytes),
196 })
197}
198
199#[cfg(windows)]
200fn get_max_rss_windows(handle: std::os::windows::io::RawHandle) -> Option<u64> {
201 use windows::Win32::Foundation::HANDLE;
202 use windows::Win32::System::ProcessStatus::{GetProcessMemoryInfo, PROCESS_MEMORY_COUNTERS};
203
204 let mut counters = PROCESS_MEMORY_COUNTERS::default();
205 unsafe {
206 if GetProcessMemoryInfo(
207 HANDLE(handle as _),
208 &mut counters,
209 std::mem::size_of::<PROCESS_MEMORY_COUNTERS>() as u32,
210 )
211 .is_ok()
212 {
213 Some((counters.PeakWorkingSetSize / 1024) as u64)
214 } else {
215 None
216 }
217 }
218}
219
220#[cfg(windows)]
221fn get_io_counters_windows(handle: std::os::windows::io::RawHandle) -> (Option<u64>, Option<u64>) {
222 use windows::Win32::Foundation::HANDLE;
223 use windows::Win32::System::Threading::{GetProcessIoCounters, IO_COUNTERS};
224
225 let mut counters = IO_COUNTERS::default();
226 unsafe {
227 if GetProcessIoCounters(HANDLE(handle as _), &mut counters).is_ok() {
228 (
229 Some(counters.ReadTransferCount),
230 Some(counters.WriteTransferCount),
231 )
232 } else {
233 (None, None)
234 }
235 }
236}
237
238#[cfg(unix)]
239fn run_unix(spec: &CommandSpec) -> Result<RunResult, AdapterError> {
240 use std::os::unix::process::ExitStatusExt;
241 use std::process::{Command, Stdio};
242
243 let binary_bytes = binary_bytes_for_command(spec);
244 let mut cmd = Command::new(&spec.argv[0]);
245 if spec.argv.len() > 1 {
246 cmd.args(&spec.argv[1..]);
247 }
248
249 if let Some(cwd) = &spec.cwd {
250 cmd.current_dir(cwd);
251 }
252
253 for (k, v) in &spec.env {
254 cmd.env(k, v);
255 }
256
257 cmd.stdout(Stdio::piped());
258 cmd.stderr(Stdio::piped());
259
260 let mut usage_before = unsafe { std::mem::zeroed::<libc::rusage>() };
261 let _ = unsafe { libc::getrusage(libc::RUSAGE_CHILDREN, &mut usage_before) };
262
263 let start = Instant::now();
264
265 let out = if let Some(timeout) = spec.timeout {
266 let mut child = cmd.spawn().map_err(|e| AdapterError::RunCommand {
267 command: spec.argv.join(" "),
268 reason: e.to_string(),
269 })?;
270
271 match child
272 .wait_timeout(timeout)
273 .map_err(|e| AdapterError::Other(e.to_string()))?
274 {
275 Some(_) => child
276 .wait_with_output()
277 .map_err(|e| AdapterError::RunCommand {
278 command: spec.argv.join(" "),
279 reason: e.to_string(),
280 })?,
281 None => {
282 child.kill().ok();
283 return Err(AdapterError::Timeout);
284 }
285 }
286 } else {
287 cmd.output().map_err(|e| AdapterError::RunCommand {
288 command: spec.argv.join(" "),
289 reason: e.to_string(),
290 })?
291 };
292
293 let wall_ms = start.elapsed().as_millis() as u64;
294 let exit_code = out
295 .status
296 .code()
297 .or_else(|| out.status.signal())
298 .unwrap_or(-1);
299
300 let mut cpu_ms = None;
301 let mut max_rss_kb = None;
302 let mut page_faults = None;
303 let mut ctx_switches = None;
304
305 let mut usage_after = unsafe { std::mem::zeroed::<libc::rusage>() };
306 if unsafe { libc::getrusage(libc::RUSAGE_CHILDREN, &mut usage_after) } == 0 {
307 let user_ms = diff_timeval_ms(usage_after.ru_utime, usage_before.ru_utime);
308 let sys_ms = diff_timeval_ms(usage_after.ru_stime, usage_before.ru_stime);
309
310 cpu_ms = Some(user_ms.saturating_add(sys_ms));
311 max_rss_kb = Some(usage_after.ru_maxrss as u64);
312 page_faults =
313 Some((usage_after.ru_majflt as u64).saturating_sub(usage_before.ru_majflt as u64));
314 ctx_switches = Some(
315 (usage_after.ru_nvcsw as u64)
316 .saturating_sub(usage_before.ru_nvcsw as u64)
317 .saturating_add(
318 (usage_after.ru_nivcsw as u64).saturating_sub(usage_before.ru_nivcsw as u64),
319 ),
320 );
321 }
322
323 Ok(RunResult {
324 wall_ms,
325 exit_code,
326 timed_out: false,
327 cpu_ms,
328 page_faults,
329 ctx_switches,
330 max_rss_kb,
331 io_read_bytes: None,
332 io_write_bytes: None,
333 network_packets: None,
334 energy_uj: None,
335 binary_bytes,
336 stdout: truncate(out.stdout, spec.output_cap_bytes),
337 stderr: truncate(out.stderr, spec.output_cap_bytes),
338 })
339}
340
341#[derive(Clone, Debug, Default)]
343pub struct StdProcessRunner;
344
345impl ProcessRunner for StdProcessRunner {
346 fn run(&self, spec: &CommandSpec) -> Result<RunResult, AdapterError> {
347 if spec.argv.is_empty() {
348 return Err(AdapterError::EmptyArgv);
349 }
350
351 #[cfg(windows)]
352 {
353 run_windows(spec)
354 }
355 #[cfg(unix)]
356 {
357 run_unix(spec)
358 }
359 #[cfg(all(not(unix), not(windows)))]
360 {
361 run_portable(spec)
362 }
363 }
364}
365
366pub trait HostProbe {
368 fn probe(&self, options: &HostProbeOptions) -> perfgate_types::HostInfo;
369}
370
371#[derive(Debug, Clone, Default)]
372pub struct HostProbeOptions {
373 pub include_hostname_hash: bool,
374}
375
376#[derive(Clone, Debug, Default)]
377pub struct StdHostProbe;
378
379impl HostProbe for StdHostProbe {
380 fn probe(&self, options: &HostProbeOptions) -> perfgate_types::HostInfo {
381 let hostname_hash = if options.include_hostname_hash {
382 hostname::get()
383 .ok()
384 .map(|h| sha256_hex(h.to_string_lossy().as_bytes()))
385 } else {
386 None
387 };
388
389 perfgate_types::HostInfo {
390 os: std::env::consts::OS.to_string(),
391 arch: std::env::consts::ARCH.to_string(),
392 cpu_count: Some(num_cpus::get_physical() as u32),
393 memory_bytes: Some(get_total_memory()),
394 hostname_hash,
395 }
396 }
397}
398
399fn get_total_memory() -> u64 {
400 #[cfg(windows)]
401 {
402 use windows::Win32::System::SystemInformation::{GlobalMemoryStatusEx, MEMORYSTATUSEX};
403 let mut mem_status = MEMORYSTATUSEX {
404 dwLength: std::mem::size_of::<MEMORYSTATUSEX>() as u32,
405 ..Default::default()
406 };
407 unsafe {
408 if GlobalMemoryStatusEx(&mut mem_status).is_ok() {
409 mem_status.ullTotalPhys
410 } else {
411 0
412 }
413 }
414 }
415 #[cfg(unix)]
416 {
417 0
419 }
420 #[cfg(all(not(unix), not(windows)))]
421 {
422 0
423 }
424}
425
426fn binary_bytes_for_command(spec: &CommandSpec) -> Option<u64> {
427 spec.argv.first().and_then(|cmd| {
428 let path = Path::new(cmd);
429 if path.exists() {
430 std::fs::metadata(path).ok().map(|m| m.len())
431 } else {
432 which::which(cmd)
434 .ok()
435 .and_then(|p| std::fs::metadata(p).ok().map(|m| m.len()))
436 }
437 })
438}
439
440trait CommandTimeoutExt {
442 fn wait_timeout(
443 &mut self,
444 timeout: Duration,
445 ) -> std::io::Result<Option<std::process::ExitStatus>>;
446}
447
448#[cfg(windows)]
449impl CommandTimeoutExt for std::process::Child {
450 fn wait_timeout(
451 &mut self,
452 timeout: Duration,
453 ) -> std::io::Result<Option<std::process::ExitStatus>> {
454 use windows::Win32::Foundation::{HANDLE, WAIT_OBJECT_0, WAIT_TIMEOUT};
455 use windows::Win32::System::Threading::WaitForSingleObject;
456
457 let handle = self.as_raw_handle();
458 let millis = timeout.as_millis() as u32;
459
460 unsafe {
461 let res = WaitForSingleObject(HANDLE(handle as _), millis);
462 if res == WAIT_OBJECT_0 {
463 self.wait().map(Some)
464 } else if res == WAIT_TIMEOUT {
465 Ok(None)
466 } else {
467 Err(std::io::Error::last_os_error())
468 }
469 }
470 }
471}
472
473#[cfg(unix)]
474impl CommandTimeoutExt for std::process::Child {
475 fn wait_timeout(
476 &mut self,
477 timeout: Duration,
478 ) -> std::io::Result<Option<std::process::ExitStatus>> {
479 let start = Instant::now();
480 while start.elapsed() < timeout {
481 if let Some(status) = self.try_wait()? {
482 return Ok(Some(status));
483 }
484 std::thread::sleep(Duration::from_millis(10));
485 }
486 Ok(None)
487 }
488}
489
490#[cfg(unix)]
491fn diff_timeval_ms(after: libc::timeval, before: libc::timeval) -> u64 {
492 #[allow(clippy::unnecessary_cast)]
493 let mut sec = after.tv_sec as i64 - before.tv_sec as i64;
494 #[allow(clippy::unnecessary_cast)]
495 let mut usec = after.tv_usec as i64 - before.tv_usec as i64;
496
497 if usec < 0 {
498 sec -= 1;
499 usec += 1_000_000;
500 }
501
502 if sec < 0 {
504 return 0;
505 }
506
507 (sec as u64) * 1000 + (usec as u64) / 1000
508}
509
510#[cfg(test)]
511mod tests {
512 use super::*;
513
514 #[test]
515 fn truncate_works() {
516 let data = vec![1, 2, 3, 4, 5];
517 assert_eq!(truncate(data.clone(), 3), vec![1, 2, 3]);
518 assert_eq!(truncate(data.clone(), 10), data);
519 assert_eq!(truncate(data.clone(), 0), data);
520 }
521
522 #[test]
524 fn read_with_cap_truncates() {
525 fn read_with_cap<R: std::io::Read>(reader: &mut R, cap: usize) -> Vec<u8> {
526 let mut buf = vec![0u8; cap];
527 let n = reader.read(&mut buf).unwrap();
528 buf.truncate(n);
529 buf
530 }
531
532 let mut reader: &[u8] = b"hello world";
533 let result = read_with_cap(&mut reader, 5);
534 assert_eq!(result, b"hello");
535 }
536}