1mod fake;
25
26pub use fake::FakeProcessRunner;
27
28pub use perfgate_error::AdapterError;
29use perfgate_sha256::sha256_hex;
30use std::path::{Path, PathBuf};
31use std::time::{Duration, Instant};
32
33#[cfg(windows)]
34use std::os::windows::io::AsRawHandle;
35
36#[derive(Debug, Clone, Default)]
38pub struct CommandSpec {
39 pub name: String,
40 pub cwd: Option<PathBuf>,
41 pub argv: Vec<String>,
42 pub env: Vec<(String, String)>,
43 pub timeout: Option<Duration>,
44 pub output_cap_bytes: usize,
45}
46
47#[derive(Debug, Clone, Default)]
49pub struct RunResult {
50 pub wall_ms: u64,
51 pub exit_code: i32,
52 pub timed_out: bool,
53 pub cpu_ms: Option<u64>,
56 pub page_faults: Option<u64>,
59 pub ctx_switches: Option<u64>,
61 pub max_rss_kb: Option<u64>,
64 pub io_read_bytes: Option<u64>,
66 pub io_write_bytes: Option<u64>,
68 pub network_packets: Option<u64>,
70 pub energy_uj: Option<u64>,
72 pub binary_bytes: Option<u64>,
74 pub stdout: Vec<u8>,
75 pub stderr: Vec<u8>,
76}
77
78pub trait ProcessRunner {
79 fn run(&self, spec: &CommandSpec) -> Result<RunResult, AdapterError>;
80}
81
82fn truncate(mut bytes: Vec<u8>, cap: usize) -> Vec<u8> {
84 if cap > 0 && bytes.len() > cap {
85 bytes.truncate(cap);
86 }
87 bytes
88}
89
90#[cfg(all(not(unix), not(windows)))]
91fn run_portable(spec: &CommandSpec) -> Result<RunResult, AdapterError> {
92 use std::process::Command;
93
94 let start = Instant::now();
95 let binary_bytes = binary_bytes_for_command(spec);
96 let mut cmd = Command::new(&spec.argv[0]);
97 if spec.argv.len() > 1 {
98 cmd.args(&spec.argv[1..]);
99 }
100
101 if let Some(cwd) = &spec.cwd {
102 cmd.current_dir(cwd);
103 }
104
105 for (k, v) in &spec.env {
106 cmd.env(k, v);
107 }
108
109 let out = cmd.output().map_err(|e| AdapterError::RunCommand {
110 command: spec.argv.join(" "),
111 reason: e.to_string(),
112 })?;
113
114 let wall_ms = start.elapsed().as_millis() as u64;
115 let exit_code = out.status.code().unwrap_or(-1);
116
117 Ok(RunResult {
118 wall_ms,
119 exit_code,
120 timed_out: false,
121 cpu_ms: None,
122 page_faults: None,
123 ctx_switches: None,
124 max_rss_kb: None,
125 io_read_bytes: None,
126 io_write_bytes: None,
127 network_packets: None,
128 energy_uj: None,
129 binary_bytes,
130 stdout: truncate(out.stdout, spec.output_cap_bytes),
131 stderr: truncate(out.stderr, spec.output_cap_bytes),
132 })
133}
134
135#[cfg(windows)]
136fn run_windows(spec: &CommandSpec) -> Result<RunResult, AdapterError> {
137 use std::process::{Command, Stdio};
138
139 let start = Instant::now();
140 let binary_bytes = binary_bytes_for_command(spec);
141
142 let mut cmd = Command::new(&spec.argv[0]);
143 if spec.argv.len() > 1 {
144 cmd.args(&spec.argv[1..]);
145 }
146
147 if let Some(cwd) = &spec.cwd {
148 cmd.current_dir(cwd);
149 }
150
151 for (k, v) in &spec.env {
152 cmd.env(k, v);
153 }
154
155 cmd.stdout(Stdio::piped());
156 cmd.stderr(Stdio::piped());
157
158 let mut child = cmd.spawn().map_err(|e| AdapterError::RunCommand {
159 command: spec.argv.join(" "),
160 reason: e.to_string(),
161 })?;
162
163 let exit_status = if let Some(timeout) = spec.timeout {
164 match child
165 .wait_timeout(timeout)
166 .map_err(|e| AdapterError::Other(e.to_string()))?
167 {
168 Some(status) => status,
169 None => {
170 child.kill().ok();
171 return Err(AdapterError::Timeout);
172 }
173 }
174 } else {
175 child
176 .wait()
177 .map_err(|e| AdapterError::Other(e.to_string()))?
178 };
179
180 let wall_ms = start.elapsed().as_millis() as u64;
181 let exit_code = exit_status.code().unwrap_or(-1);
182
183 let mut stdout_buf = Vec::new();
184 let mut stderr_buf = Vec::new();
185
186 let handle = child.as_raw_handle();
189 let (max_rss_kb, page_faults) = get_memory_info_windows(handle);
190 let (io_read_bytes, io_write_bytes) = get_io_counters_windows(handle);
191
192 if let Some(mut stdout) = child.stdout.take() {
193 use std::io::Read;
194 stdout.read_to_end(&mut stdout_buf).ok();
195 }
196 if let Some(mut stderr) = child.stderr.take() {
197 use std::io::Read;
198 stderr.read_to_end(&mut stderr_buf).ok();
199 }
200
201 Ok(RunResult {
202 wall_ms,
203 exit_code,
204 timed_out: false,
205 cpu_ms: None,
206 page_faults,
207 ctx_switches: None,
208 max_rss_kb,
209 io_read_bytes,
210 io_write_bytes,
211 network_packets: None,
212 energy_uj: None,
213 binary_bytes,
214 stdout: truncate(stdout_buf, spec.output_cap_bytes),
215 stderr: truncate(stderr_buf, spec.output_cap_bytes),
216 })
217}
218
219#[cfg(windows)]
221fn get_memory_info_windows(handle: std::os::windows::io::RawHandle) -> (Option<u64>, Option<u64>) {
222 use windows::Win32::Foundation::HANDLE;
223 use windows::Win32::System::ProcessStatus::{GetProcessMemoryInfo, PROCESS_MEMORY_COUNTERS};
224
225 let mut counters = PROCESS_MEMORY_COUNTERS::default();
226 unsafe {
227 if GetProcessMemoryInfo(
228 HANDLE(handle as _),
229 &mut counters,
230 std::mem::size_of::<PROCESS_MEMORY_COUNTERS>() as u32,
231 )
232 .is_ok()
233 {
234 (
235 Some((counters.PeakWorkingSetSize / 1024) as u64),
236 Some(counters.PageFaultCount as u64),
237 )
238 } else {
239 (None, None)
240 }
241 }
242}
243
244#[cfg(windows)]
245fn get_io_counters_windows(handle: std::os::windows::io::RawHandle) -> (Option<u64>, Option<u64>) {
246 use windows::Win32::Foundation::HANDLE;
247 use windows::Win32::System::Threading::{GetProcessIoCounters, IO_COUNTERS};
248
249 let mut counters = IO_COUNTERS::default();
250 unsafe {
251 if GetProcessIoCounters(HANDLE(handle as _), &mut counters).is_ok() {
252 (
253 Some(counters.ReadTransferCount),
254 Some(counters.WriteTransferCount),
255 )
256 } else {
257 (None, None)
258 }
259 }
260}
261
262#[cfg(unix)]
263fn run_unix(spec: &CommandSpec) -> Result<RunResult, AdapterError> {
264 use std::os::unix::process::ExitStatusExt;
265 use std::process::{Command, Stdio};
266
267 let binary_bytes = binary_bytes_for_command(spec);
268 let mut cmd = Command::new(&spec.argv[0]);
269 if spec.argv.len() > 1 {
270 cmd.args(&spec.argv[1..]);
271 }
272
273 if let Some(cwd) = &spec.cwd {
274 cmd.current_dir(cwd);
275 }
276
277 for (k, v) in &spec.env {
278 cmd.env(k, v);
279 }
280
281 cmd.stdout(Stdio::piped());
282 cmd.stderr(Stdio::piped());
283
284 let mut usage_before = unsafe { std::mem::zeroed::<libc::rusage>() };
285 let _ = unsafe { libc::getrusage(libc::RUSAGE_CHILDREN, &mut usage_before) };
286
287 let start = Instant::now();
288
289 let out = if let Some(timeout) = spec.timeout {
290 let mut child = cmd.spawn().map_err(|e| AdapterError::RunCommand {
291 command: spec.argv.join(" "),
292 reason: e.to_string(),
293 })?;
294
295 match child
296 .wait_timeout(timeout)
297 .map_err(|e| AdapterError::Other(e.to_string()))?
298 {
299 Some(_) => child
300 .wait_with_output()
301 .map_err(|e| AdapterError::RunCommand {
302 command: spec.argv.join(" "),
303 reason: e.to_string(),
304 })?,
305 None => {
306 child.kill().ok();
307 return Err(AdapterError::Timeout);
308 }
309 }
310 } else {
311 cmd.output().map_err(|e| AdapterError::RunCommand {
312 command: spec.argv.join(" "),
313 reason: e.to_string(),
314 })?
315 };
316
317 let wall_ms = start.elapsed().as_millis() as u64;
318 let exit_code = out
319 .status
320 .code()
321 .or_else(|| out.status.signal())
322 .unwrap_or(-1);
323
324 let mut cpu_ms = None;
325 let mut max_rss_kb = None;
326 let mut page_faults = None;
327 let mut ctx_switches = None;
328
329 let mut usage_after = unsafe { std::mem::zeroed::<libc::rusage>() };
330 if unsafe { libc::getrusage(libc::RUSAGE_CHILDREN, &mut usage_after) } == 0 {
331 let user_ms = diff_timeval_ms(usage_after.ru_utime, usage_before.ru_utime);
332 let sys_ms = diff_timeval_ms(usage_after.ru_stime, usage_before.ru_stime);
333
334 cpu_ms = Some(user_ms.saturating_add(sys_ms));
335 max_rss_kb = Some(usage_after.ru_maxrss as u64);
336 page_faults =
337 Some((usage_after.ru_majflt as u64).saturating_sub(usage_before.ru_majflt as u64));
338 ctx_switches = Some(
339 (usage_after.ru_nvcsw as u64)
340 .saturating_sub(usage_before.ru_nvcsw as u64)
341 .saturating_add(
342 (usage_after.ru_nivcsw as u64).saturating_sub(usage_before.ru_nivcsw as u64),
343 ),
344 );
345 }
346
347 Ok(RunResult {
348 wall_ms,
349 exit_code,
350 timed_out: false,
351 cpu_ms,
352 page_faults,
353 ctx_switches,
354 max_rss_kb,
355 io_read_bytes: None,
356 io_write_bytes: None,
357 network_packets: None,
358 energy_uj: None,
359 binary_bytes,
360 stdout: truncate(out.stdout, spec.output_cap_bytes),
361 stderr: truncate(out.stderr, spec.output_cap_bytes),
362 })
363}
364
365#[derive(Clone, Debug, Default)]
367pub struct StdProcessRunner;
368
369impl ProcessRunner for StdProcessRunner {
370 fn run(&self, spec: &CommandSpec) -> Result<RunResult, AdapterError> {
371 if spec.argv.is_empty() {
372 return Err(AdapterError::EmptyArgv);
373 }
374
375 #[cfg(windows)]
376 {
377 run_windows(spec)
378 }
379 #[cfg(unix)]
380 {
381 run_unix(spec)
382 }
383 #[cfg(all(not(unix), not(windows)))]
384 {
385 run_portable(spec)
386 }
387 }
388}
389
390pub trait HostProbe {
392 fn probe(&self, options: &HostProbeOptions) -> perfgate_types::HostInfo;
393}
394
395#[derive(Debug, Clone, Default)]
396pub struct HostProbeOptions {
397 pub include_hostname_hash: bool,
398}
399
400#[derive(Clone, Debug, Default)]
401pub struct StdHostProbe;
402
403impl HostProbe for StdHostProbe {
404 fn probe(&self, options: &HostProbeOptions) -> perfgate_types::HostInfo {
405 let hostname_hash = if options.include_hostname_hash {
406 hostname::get()
407 .ok()
408 .map(|h| sha256_hex(h.to_string_lossy().as_bytes()))
409 } else {
410 None
411 };
412
413 perfgate_types::HostInfo {
414 os: std::env::consts::OS.to_string(),
415 arch: std::env::consts::ARCH.to_string(),
416 cpu_count: Some(num_cpus::get_physical() as u32),
417 memory_bytes: Some(get_total_memory()),
418 hostname_hash,
419 }
420 }
421}
422
423fn get_total_memory() -> u64 {
424 #[cfg(windows)]
425 {
426 use windows::Win32::System::SystemInformation::{GlobalMemoryStatusEx, MEMORYSTATUSEX};
427 let mut mem_status = MEMORYSTATUSEX {
428 dwLength: std::mem::size_of::<MEMORYSTATUSEX>() as u32,
429 ..Default::default()
430 };
431 unsafe {
432 if GlobalMemoryStatusEx(&mut mem_status).is_ok() {
433 mem_status.ullTotalPhys
434 } else {
435 0
436 }
437 }
438 }
439 #[cfg(unix)]
440 {
441 0
443 }
444 #[cfg(all(not(unix), not(windows)))]
445 {
446 0
447 }
448}
449
450fn binary_bytes_for_command(spec: &CommandSpec) -> Option<u64> {
451 spec.argv.first().and_then(|cmd| {
452 let path = Path::new(cmd);
453 if path.exists() {
454 std::fs::metadata(path).ok().map(|m| m.len())
455 } else {
456 which::which(cmd)
458 .ok()
459 .and_then(|p| std::fs::metadata(p).ok().map(|m| m.len()))
460 }
461 })
462}
463
464trait CommandTimeoutExt {
466 fn wait_timeout(
467 &mut self,
468 timeout: Duration,
469 ) -> std::io::Result<Option<std::process::ExitStatus>>;
470}
471
472#[cfg(any(unix, windows))]
473impl CommandTimeoutExt for std::process::Child {
474 fn wait_timeout(
475 &mut self,
476 timeout: Duration,
477 ) -> std::io::Result<Option<std::process::ExitStatus>> {
478 let start = Instant::now();
479 while start.elapsed() < timeout {
480 if let Some(status) = self.try_wait()? {
481 return Ok(Some(status));
482 }
483 std::thread::sleep(Duration::from_millis(10));
484 }
485 Ok(None)
486 }
487}
488
489#[cfg(unix)]
490fn diff_timeval_ms(after: libc::timeval, before: libc::timeval) -> u64 {
491 #[allow(clippy::unnecessary_cast)]
492 let mut sec = after.tv_sec as i64 - before.tv_sec as i64;
493 #[allow(clippy::unnecessary_cast)]
494 let mut usec = after.tv_usec as i64 - before.tv_usec as i64;
495
496 if usec < 0 {
497 sec -= 1;
498 usec += 1_000_000;
499 }
500
501 if sec < 0 {
503 return 0;
504 }
505
506 (sec as u64) * 1000 + (usec as u64) / 1000
507}
508
509#[cfg(test)]
510mod tests {
511 use super::*;
512
513 #[test]
514 fn truncate_works() {
515 let data = vec![1, 2, 3, 4, 5];
516 assert_eq!(truncate(data.clone(), 3), vec![1, 2, 3]);
517 assert_eq!(truncate(data.clone(), 10), data);
518 assert_eq!(truncate(data.clone(), 0), data);
519 }
520
521 #[test]
523 fn read_with_cap_truncates() {
524 fn read_with_cap<R: std::io::Read>(reader: &mut R, cap: usize) -> Vec<u8> {
525 let mut buf = vec![0u8; cap];
526 let n = reader.read(&mut buf).unwrap();
527 buf.truncate(n);
528 buf
529 }
530
531 let mut reader: &[u8] = b"hello world";
532 let result = read_with_cap(&mut reader, 5);
533 assert_eq!(result, b"hello");
534 }
535
536 #[cfg(windows)]
538 #[test]
539 fn windows_page_faults_populated() {
540 let runner = StdProcessRunner;
541 let spec = CommandSpec {
542 name: "page-faults-test".into(),
543 argv: vec!["cmd".into(), "/c".into(), "exit".into(), "0".into()],
544 ..Default::default()
545 };
546 let result = runner.run(&spec).expect("command should succeed");
547 assert_eq!(result.exit_code, 0);
548 assert!(
549 result.page_faults.is_some(),
550 "page_faults should be Some on Windows"
551 );
552 assert!(
555 result.page_faults.unwrap() > 0,
556 "page_faults should be > 0 for a real process"
557 );
558 }
559
560 #[cfg(windows)]
562 #[test]
563 fn windows_ctx_switches_none() {
564 let runner = StdProcessRunner;
565 let spec = CommandSpec {
566 name: "ctx-switches-test".into(),
567 argv: vec!["cmd".into(), "/c".into(), "exit".into(), "0".into()],
568 ..Default::default()
569 };
570 let result = runner.run(&spec).expect("command should succeed");
571 assert!(
572 result.ctx_switches.is_none(),
573 "ctx_switches should be None on Windows"
574 );
575 }
576}