Skip to main content

perfgate_adapters/
lib.rs

1//! Process execution and host probing adapters for perfgate.
2//!
3//! In clean-arch terms this is where perfgate touches the world: running child
4//! processes, collecting CPU time / RSS via platform APIs, and probing host
5//! environment metadata for mismatch detection.
6//!
7//! Part of the [perfgate](https://github.com/EffortlessMetrics/perfgate) workspace.
8//!
9//! # Example
10//!
11//! ```no_run
12//! use perfgate_adapters::{StdProcessRunner, ProcessRunner, CommandSpec};
13//!
14//! let runner = StdProcessRunner;
15//! let spec = CommandSpec {
16//!     name: "echo".into(),
17//!     argv: vec!["hello".into()],
18//!     ..Default::default()
19//! };
20//! let result = runner.run(&spec).unwrap();
21//! println!("wall_ms: {}", result.wall_ms);
22//! ```
23
24mod fake;
25
26pub use fake::FakeProcessRunner;
27
28pub use perfgate_error::AdapterError;
29use perfgate_sha256::sha256_hex;
30use std::path::{Path, PathBuf};
31use std::time::{Duration, Instant};
32
33#[cfg(windows)]
34use std::os::windows::io::AsRawHandle;
35
36/// Command to execute.
37#[derive(Debug, Clone, Default)]
38pub struct CommandSpec {
39    pub name: String,
40    pub cwd: Option<PathBuf>,
41    pub argv: Vec<String>,
42    pub env: Vec<(String, String)>,
43    pub timeout: Option<Duration>,
44    pub output_cap_bytes: usize,
45}
46
47/// Result of a single execution.
48#[derive(Debug, Clone, Default)]
49pub struct RunResult {
50    pub wall_ms: u64,
51    pub exit_code: i32,
52    pub timed_out: bool,
53    /// CPU time (user + system) in milliseconds.
54    /// Collected on Unix via rusage and best-effort on Windows.
55    pub cpu_ms: Option<u64>,
56    /// Page faults. On Unix: major page faults from rusage.
57    /// On Windows: total page faults from GetProcessMemoryInfo (PageFaultCount).
58    pub page_faults: Option<u64>,
59    /// Voluntary + involuntary context switches (Unix only; None on Windows).
60    pub ctx_switches: Option<u64>,
61    /// Peak resident set size in KB.
62    /// Collected on Unix via rusage and best-effort on Windows.
63    pub max_rss_kb: Option<u64>,
64    /// Bytes read from disk (best-effort).
65    pub io_read_bytes: Option<u64>,
66    /// Bytes written to disk (best-effort).
67    pub io_write_bytes: Option<u64>,
68    /// Total network packets (best-effort).
69    pub network_packets: Option<u64>,
70    /// CPU energy used in microjoules (RAPL on Linux).
71    pub energy_uj: Option<u64>,
72    /// Size of executed binary in bytes (best-effort).
73    pub binary_bytes: Option<u64>,
74    pub stdout: Vec<u8>,
75    pub stderr: Vec<u8>,
76}
77
78pub trait ProcessRunner {
79    fn run(&self, spec: &CommandSpec) -> Result<RunResult, AdapterError>;
80}
81
82/// Helper to truncate stdout/stderr bytes.
83fn truncate(mut bytes: Vec<u8>, cap: usize) -> Vec<u8> {
84    if cap > 0 && bytes.len() > cap {
85        bytes.truncate(cap);
86    }
87    bytes
88}
89
90#[cfg(all(not(unix), not(windows)))]
91fn run_portable(spec: &CommandSpec) -> Result<RunResult, AdapterError> {
92    use std::process::Command;
93
94    let start = Instant::now();
95    let binary_bytes = binary_bytes_for_command(spec);
96    let mut cmd = Command::new(&spec.argv[0]);
97    if spec.argv.len() > 1 {
98        cmd.args(&spec.argv[1..]);
99    }
100
101    if let Some(cwd) = &spec.cwd {
102        cmd.current_dir(cwd);
103    }
104
105    for (k, v) in &spec.env {
106        cmd.env(k, v);
107    }
108
109    let out = cmd.output().map_err(|e| AdapterError::RunCommand {
110        command: spec.argv.join(" "),
111        reason: e.to_string(),
112    })?;
113
114    let wall_ms = start.elapsed().as_millis() as u64;
115    let exit_code = out.status.code().unwrap_or(-1);
116
117    Ok(RunResult {
118        wall_ms,
119        exit_code,
120        timed_out: false,
121        cpu_ms: None,
122        page_faults: None,
123        ctx_switches: None,
124        max_rss_kb: None,
125        io_read_bytes: None,
126        io_write_bytes: None,
127        network_packets: None,
128        energy_uj: None,
129        binary_bytes,
130        stdout: truncate(out.stdout, spec.output_cap_bytes),
131        stderr: truncate(out.stderr, spec.output_cap_bytes),
132    })
133}
134
135#[cfg(windows)]
136fn run_windows(spec: &CommandSpec) -> Result<RunResult, AdapterError> {
137    use std::process::{Command, Stdio};
138
139    let start = Instant::now();
140    let binary_bytes = binary_bytes_for_command(spec);
141
142    let mut cmd = Command::new(&spec.argv[0]);
143    if spec.argv.len() > 1 {
144        cmd.args(&spec.argv[1..]);
145    }
146
147    if let Some(cwd) = &spec.cwd {
148        cmd.current_dir(cwd);
149    }
150
151    for (k, v) in &spec.env {
152        cmd.env(k, v);
153    }
154
155    cmd.stdout(Stdio::piped());
156    cmd.stderr(Stdio::piped());
157
158    let mut child = cmd.spawn().map_err(|e| AdapterError::RunCommand {
159        command: spec.argv.join(" "),
160        reason: e.to_string(),
161    })?;
162
163    let exit_status = if let Some(timeout) = spec.timeout {
164        match child
165            .wait_timeout(timeout)
166            .map_err(|e| AdapterError::Other(e.to_string()))?
167        {
168            Some(status) => status,
169            None => {
170                child.kill().ok();
171                return Err(AdapterError::Timeout);
172            }
173        }
174    } else {
175        child
176            .wait()
177            .map_err(|e| AdapterError::Other(e.to_string()))?
178    };
179
180    let wall_ms = start.elapsed().as_millis() as u64;
181    let exit_code = exit_status.code().unwrap_or(-1);
182
183    let mut stdout_buf = Vec::new();
184    let mut stderr_buf = Vec::new();
185
186    // Windows memory and IO info requires the handle before child is dropped or stdout/stderr taken?
187    // Actually we need the handle to call GetProcessMemoryInfo and GetProcessIoCounters.
188    let handle = child.as_raw_handle();
189    let (max_rss_kb, page_faults) = get_memory_info_windows(handle);
190    let (io_read_bytes, io_write_bytes) = get_io_counters_windows(handle);
191
192    if let Some(mut stdout) = child.stdout.take() {
193        use std::io::Read;
194        stdout.read_to_end(&mut stdout_buf).ok();
195    }
196    if let Some(mut stderr) = child.stderr.take() {
197        use std::io::Read;
198        stderr.read_to_end(&mut stderr_buf).ok();
199    }
200
201    Ok(RunResult {
202        wall_ms,
203        exit_code,
204        timed_out: false,
205        cpu_ms: None,
206        page_faults,
207        ctx_switches: None,
208        max_rss_kb,
209        io_read_bytes,
210        io_write_bytes,
211        network_packets: None,
212        energy_uj: None,
213        binary_bytes,
214        stdout: truncate(stdout_buf, spec.output_cap_bytes),
215        stderr: truncate(stderr_buf, spec.output_cap_bytes),
216    })
217}
218
219/// Returns `(max_rss_kb, page_faults)` from `GetProcessMemoryInfo`.
220#[cfg(windows)]
221fn get_memory_info_windows(handle: std::os::windows::io::RawHandle) -> (Option<u64>, Option<u64>) {
222    use windows::Win32::Foundation::HANDLE;
223    use windows::Win32::System::ProcessStatus::{GetProcessMemoryInfo, PROCESS_MEMORY_COUNTERS};
224
225    let mut counters = PROCESS_MEMORY_COUNTERS::default();
226    unsafe {
227        if GetProcessMemoryInfo(
228            HANDLE(handle as _),
229            &mut counters,
230            std::mem::size_of::<PROCESS_MEMORY_COUNTERS>() as u32,
231        )
232        .is_ok()
233        {
234            (
235                Some((counters.PeakWorkingSetSize / 1024) as u64),
236                Some(counters.PageFaultCount as u64),
237            )
238        } else {
239            (None, None)
240        }
241    }
242}
243
244#[cfg(windows)]
245fn get_io_counters_windows(handle: std::os::windows::io::RawHandle) -> (Option<u64>, Option<u64>) {
246    use windows::Win32::Foundation::HANDLE;
247    use windows::Win32::System::Threading::{GetProcessIoCounters, IO_COUNTERS};
248
249    let mut counters = IO_COUNTERS::default();
250    unsafe {
251        if GetProcessIoCounters(HANDLE(handle as _), &mut counters).is_ok() {
252            (
253                Some(counters.ReadTransferCount),
254                Some(counters.WriteTransferCount),
255            )
256        } else {
257            (None, None)
258        }
259    }
260}
261
262#[cfg(unix)]
263fn run_unix(spec: &CommandSpec) -> Result<RunResult, AdapterError> {
264    use std::os::unix::process::ExitStatusExt;
265    use std::process::{Command, Stdio};
266
267    let binary_bytes = binary_bytes_for_command(spec);
268    let mut cmd = Command::new(&spec.argv[0]);
269    if spec.argv.len() > 1 {
270        cmd.args(&spec.argv[1..]);
271    }
272
273    if let Some(cwd) = &spec.cwd {
274        cmd.current_dir(cwd);
275    }
276
277    for (k, v) in &spec.env {
278        cmd.env(k, v);
279    }
280
281    cmd.stdout(Stdio::piped());
282    cmd.stderr(Stdio::piped());
283
284    let mut usage_before = unsafe { std::mem::zeroed::<libc::rusage>() };
285    let _ = unsafe { libc::getrusage(libc::RUSAGE_CHILDREN, &mut usage_before) };
286
287    let start = Instant::now();
288
289    let out = if let Some(timeout) = spec.timeout {
290        let mut child = cmd.spawn().map_err(|e| AdapterError::RunCommand {
291            command: spec.argv.join(" "),
292            reason: e.to_string(),
293        })?;
294
295        match child
296            .wait_timeout(timeout)
297            .map_err(|e| AdapterError::Other(e.to_string()))?
298        {
299            Some(_) => child
300                .wait_with_output()
301                .map_err(|e| AdapterError::RunCommand {
302                    command: spec.argv.join(" "),
303                    reason: e.to_string(),
304                })?,
305            None => {
306                child.kill().ok();
307                return Err(AdapterError::Timeout);
308            }
309        }
310    } else {
311        cmd.output().map_err(|e| AdapterError::RunCommand {
312            command: spec.argv.join(" "),
313            reason: e.to_string(),
314        })?
315    };
316
317    let wall_ms = start.elapsed().as_millis() as u64;
318    let exit_code = out
319        .status
320        .code()
321        .or_else(|| out.status.signal())
322        .unwrap_or(-1);
323
324    let mut cpu_ms = None;
325    let mut max_rss_kb = None;
326    let mut page_faults = None;
327    let mut ctx_switches = None;
328
329    let mut usage_after = unsafe { std::mem::zeroed::<libc::rusage>() };
330    if unsafe { libc::getrusage(libc::RUSAGE_CHILDREN, &mut usage_after) } == 0 {
331        let user_ms = diff_timeval_ms(usage_after.ru_utime, usage_before.ru_utime);
332        let sys_ms = diff_timeval_ms(usage_after.ru_stime, usage_before.ru_stime);
333
334        cpu_ms = Some(user_ms.saturating_add(sys_ms));
335        max_rss_kb = Some(usage_after.ru_maxrss as u64);
336        page_faults =
337            Some((usage_after.ru_majflt as u64).saturating_sub(usage_before.ru_majflt as u64));
338        ctx_switches = Some(
339            (usage_after.ru_nvcsw as u64)
340                .saturating_sub(usage_before.ru_nvcsw as u64)
341                .saturating_add(
342                    (usage_after.ru_nivcsw as u64).saturating_sub(usage_before.ru_nivcsw as u64),
343                ),
344        );
345    }
346
347    Ok(RunResult {
348        wall_ms,
349        exit_code,
350        timed_out: false,
351        cpu_ms,
352        page_faults,
353        ctx_switches,
354        max_rss_kb,
355        io_read_bytes: None,
356        io_write_bytes: None,
357        network_packets: None,
358        energy_uj: None,
359        binary_bytes,
360        stdout: truncate(out.stdout, spec.output_cap_bytes),
361        stderr: truncate(out.stderr, spec.output_cap_bytes),
362    })
363}
364
365/// Standard process runner using std::process::Command.
366#[derive(Clone, Debug, Default)]
367pub struct StdProcessRunner;
368
369impl ProcessRunner for StdProcessRunner {
370    fn run(&self, spec: &CommandSpec) -> Result<RunResult, AdapterError> {
371        if spec.argv.is_empty() {
372            return Err(AdapterError::EmptyArgv);
373        }
374
375        #[cfg(windows)]
376        {
377            run_windows(spec)
378        }
379        #[cfg(unix)]
380        {
381            run_unix(spec)
382        }
383        #[cfg(all(not(unix), not(windows)))]
384        {
385            run_portable(spec)
386        }
387    }
388}
389
390/// Host fingerprinting and metadata collection.
391pub trait HostProbe {
392    fn probe(&self, options: &HostProbeOptions) -> perfgate_types::HostInfo;
393}
394
395#[derive(Debug, Clone, Default)]
396pub struct HostProbeOptions {
397    pub include_hostname_hash: bool,
398}
399
400#[derive(Clone, Debug, Default)]
401pub struct StdHostProbe;
402
403impl HostProbe for StdHostProbe {
404    fn probe(&self, options: &HostProbeOptions) -> perfgate_types::HostInfo {
405        let hostname_hash = if options.include_hostname_hash {
406            hostname::get()
407                .ok()
408                .map(|h| sha256_hex(h.to_string_lossy().as_bytes()))
409        } else {
410            None
411        };
412
413        perfgate_types::HostInfo {
414            os: std::env::consts::OS.to_string(),
415            arch: std::env::consts::ARCH.to_string(),
416            cpu_count: Some(num_cpus::get_physical() as u32),
417            memory_bytes: Some(get_total_memory()),
418            hostname_hash,
419        }
420    }
421}
422
423fn get_total_memory() -> u64 {
424    #[cfg(windows)]
425    {
426        use windows::Win32::System::SystemInformation::{GlobalMemoryStatusEx, MEMORYSTATUSEX};
427        let mut mem_status = MEMORYSTATUSEX {
428            dwLength: std::mem::size_of::<MEMORYSTATUSEX>() as u32,
429            ..Default::default()
430        };
431        unsafe {
432            if GlobalMemoryStatusEx(&mut mem_status).is_ok() {
433                mem_status.ullTotalPhys
434            } else {
435                0
436            }
437        }
438    }
439    #[cfg(unix)]
440    {
441        // Simple fallback for total memory on Unix
442        0
443    }
444    #[cfg(all(not(unix), not(windows)))]
445    {
446        0
447    }
448}
449
450fn binary_bytes_for_command(spec: &CommandSpec) -> Option<u64> {
451    spec.argv.first().and_then(|cmd| {
452        let path = Path::new(cmd);
453        if path.exists() {
454            std::fs::metadata(path).ok().map(|m| m.len())
455        } else {
456            // Try searching in PATH
457            which::which(cmd)
458                .ok()
459                .and_then(|p| std::fs::metadata(p).ok().map(|m| m.len()))
460        }
461    })
462}
463
464// Extension trait for Command to support timeout on Windows/Unix
465trait CommandTimeoutExt {
466    fn wait_timeout(
467        &mut self,
468        timeout: Duration,
469    ) -> std::io::Result<Option<std::process::ExitStatus>>;
470}
471
472#[cfg(any(unix, windows))]
473impl CommandTimeoutExt for std::process::Child {
474    fn wait_timeout(
475        &mut self,
476        timeout: Duration,
477    ) -> std::io::Result<Option<std::process::ExitStatus>> {
478        let start = Instant::now();
479        while start.elapsed() < timeout {
480            if let Some(status) = self.try_wait()? {
481                return Ok(Some(status));
482            }
483            std::thread::sleep(Duration::from_millis(10));
484        }
485        Ok(None)
486    }
487}
488
489#[cfg(unix)]
490fn diff_timeval_ms(after: libc::timeval, before: libc::timeval) -> u64 {
491    #[allow(clippy::unnecessary_cast)]
492    let mut sec = after.tv_sec as i64 - before.tv_sec as i64;
493    #[allow(clippy::unnecessary_cast)]
494    let mut usec = after.tv_usec as i64 - before.tv_usec as i64;
495
496    if usec < 0 {
497        sec -= 1;
498        usec += 1_000_000;
499    }
500
501    // Ensure we don't underflow if rusage somehow goes backwards (unlikely)
502    if sec < 0 {
503        return 0;
504    }
505
506    (sec as u64) * 1000 + (usec as u64) / 1000
507}
508
509#[cfg(test)]
510mod tests {
511    use super::*;
512
513    #[test]
514    fn truncate_works() {
515        let data = vec![1, 2, 3, 4, 5];
516        assert_eq!(truncate(data.clone(), 3), vec![1, 2, 3]);
517        assert_eq!(truncate(data.clone(), 10), data);
518        assert_eq!(truncate(data.clone(), 0), data);
519    }
520
521    /// read_with_cap truncates to cap.
522    #[test]
523    fn read_with_cap_truncates() {
524        fn read_with_cap<R: std::io::Read>(reader: &mut R, cap: usize) -> Vec<u8> {
525            let mut buf = vec![0u8; cap];
526            let n = reader.read(&mut buf).unwrap();
527            buf.truncate(n);
528            buf
529        }
530
531        let mut reader: &[u8] = b"hello world";
532        let result = read_with_cap(&mut reader, 5);
533        assert_eq!(result, b"hello");
534    }
535
536    /// On Windows, page_faults should be populated (Some) after running a command.
537    #[cfg(windows)]
538    #[test]
539    fn windows_page_faults_populated() {
540        let runner = StdProcessRunner;
541        let spec = CommandSpec {
542            name: "page-faults-test".into(),
543            argv: vec!["cmd".into(), "/c".into(), "exit".into(), "0".into()],
544            ..Default::default()
545        };
546        let result = runner.run(&spec).expect("command should succeed");
547        assert_eq!(result.exit_code, 0);
548        assert!(
549            result.page_faults.is_some(),
550            "page_faults should be Some on Windows"
551        );
552        // PageFaultCount is always >= 0; any successfully spawned process will
553        // incur at least a handful of page faults.
554        assert!(
555            result.page_faults.unwrap() > 0,
556            "page_faults should be > 0 for a real process"
557        );
558    }
559
560    /// ctx_switches remains None on Windows (no Windows API equivalent).
561    #[cfg(windows)]
562    #[test]
563    fn windows_ctx_switches_none() {
564        let runner = StdProcessRunner;
565        let spec = CommandSpec {
566            name: "ctx-switches-test".into(),
567            argv: vec!["cmd".into(), "/c".into(), "exit".into(), "0".into()],
568            ..Default::default()
569        };
570        let result = runner.run(&spec).expect("command should succeed");
571        assert!(
572            result.ctx_switches.is_none(),
573            "ctx_switches should be None on Windows"
574        );
575    }
576}