Skip to main content

perfgate_adapters/
lib.rs

1//! Std adapters for perfgate.
2//!
3//! In clean-arch terms: this is where we touch the world.
4
5mod fake;
6
7pub use fake::FakeProcessRunner;
8
9pub use perfgate_error::AdapterError;
10use perfgate_sha256::sha256_hex;
11use std::path::{Path, PathBuf};
12use std::time::{Duration, Instant};
13
14#[cfg(windows)]
15use std::os::windows::io::AsRawHandle;
16
17/// Command to execute.
18#[derive(Debug, Clone, Default)]
19pub struct CommandSpec {
20    pub name: String,
21    pub cwd: Option<PathBuf>,
22    pub argv: Vec<String>,
23    pub env: Vec<(String, String)>,
24    pub timeout: Option<Duration>,
25    pub output_cap_bytes: usize,
26}
27
28/// Result of a single execution.
29#[derive(Debug, Clone, Default)]
30pub struct RunResult {
31    pub wall_ms: u64,
32    pub exit_code: i32,
33    pub timed_out: bool,
34    /// CPU time (user + system) in milliseconds.
35    /// Collected on Unix via rusage and best-effort on Windows.
36    pub cpu_ms: Option<u64>,
37    /// Major page faults (Unix only).
38    pub page_faults: Option<u64>,
39    /// Voluntary + involuntary context switches (Unix only).
40    pub ctx_switches: Option<u64>,
41    /// Peak resident set size in KB.
42    /// Collected on Unix via rusage and best-effort on Windows.
43    pub max_rss_kb: Option<u64>,
44    /// Bytes read from disk (best-effort).
45    pub io_read_bytes: Option<u64>,
46    /// Bytes written to disk (best-effort).
47    pub io_write_bytes: Option<u64>,
48    /// Total network packets (best-effort).
49    pub network_packets: Option<u64>,
50    /// CPU energy used in microjoules (RAPL on Linux).
51    pub energy_uj: Option<u64>,
52    /// Size of executed binary in bytes (best-effort).
53    pub binary_bytes: Option<u64>,
54    pub stdout: Vec<u8>,
55    pub stderr: Vec<u8>,
56}
57
58pub trait ProcessRunner {
59    fn run(&self, spec: &CommandSpec) -> Result<RunResult, AdapterError>;
60}
61
62/// Helper to truncate stdout/stderr bytes.
63fn truncate(mut bytes: Vec<u8>, cap: usize) -> Vec<u8> {
64    if cap > 0 && bytes.len() > cap {
65        bytes.truncate(cap);
66    }
67    bytes
68}
69
70#[cfg(all(not(unix), not(windows)))]
71fn run_portable(spec: &CommandSpec) -> Result<RunResult, AdapterError> {
72    use std::process::Command;
73
74    let start = Instant::now();
75    let binary_bytes = binary_bytes_for_command(spec);
76    let mut cmd = Command::new(&spec.argv[0]);
77    if spec.argv.len() > 1 {
78        cmd.args(&spec.argv[1..]);
79    }
80
81    if let Some(cwd) = &spec.cwd {
82        cmd.current_dir(cwd);
83    }
84
85    for (k, v) in &spec.env {
86        cmd.env(k, v);
87    }
88
89    let out = cmd.output().map_err(|e| AdapterError::RunCommand {
90        command: spec.argv.join(" "),
91        reason: e.to_string(),
92    })?;
93
94    let wall_ms = start.elapsed().as_millis() as u64;
95    let exit_code = out.status.code().unwrap_or(-1);
96
97    Ok(RunResult {
98        wall_ms,
99        exit_code,
100        timed_out: false,
101        cpu_ms: None,
102        page_faults: None,
103        ctx_switches: None,
104        max_rss_kb: None,
105        io_read_bytes: None,
106        io_write_bytes: None,
107        network_packets: None,
108        energy_uj: None,
109        binary_bytes,
110        stdout: truncate(out.stdout, spec.output_cap_bytes),
111        stderr: truncate(out.stderr, spec.output_cap_bytes),
112    })
113}
114
115#[cfg(windows)]
116fn run_windows(spec: &CommandSpec) -> Result<RunResult, AdapterError> {
117    use std::process::{Command, Stdio};
118
119    let start = Instant::now();
120    let binary_bytes = binary_bytes_for_command(spec);
121
122    let mut cmd = Command::new(&spec.argv[0]);
123    if spec.argv.len() > 1 {
124        cmd.args(&spec.argv[1..]);
125    }
126
127    if let Some(cwd) = &spec.cwd {
128        cmd.current_dir(cwd);
129    }
130
131    for (k, v) in &spec.env {
132        cmd.env(k, v);
133    }
134
135    cmd.stdout(Stdio::piped());
136    cmd.stderr(Stdio::piped());
137
138    let mut child = cmd.spawn().map_err(|e| AdapterError::RunCommand {
139        command: spec.argv.join(" "),
140        reason: e.to_string(),
141    })?;
142
143    let exit_status = if let Some(timeout) = spec.timeout {
144        match child
145            .wait_timeout(timeout)
146            .map_err(|e| AdapterError::Other(e.to_string()))?
147        {
148            Some(status) => status,
149            None => {
150                child.kill().ok();
151                return Err(AdapterError::Timeout);
152            }
153        }
154    } else {
155        child
156            .wait()
157            .map_err(|e| AdapterError::Other(e.to_string()))?
158    };
159
160    let wall_ms = start.elapsed().as_millis() as u64;
161    let exit_code = exit_status.code().unwrap_or(-1);
162
163    let mut stdout_buf = Vec::new();
164    let mut stderr_buf = Vec::new();
165
166    // Windows memory and IO info requires the handle before child is dropped or stdout/stderr taken?
167    // Actually we need the handle to call GetProcessMemoryInfo and GetProcessIoCounters.
168    let handle = child.as_raw_handle();
169    let max_rss_kb = get_max_rss_windows(handle);
170    let (io_read_bytes, io_write_bytes) = get_io_counters_windows(handle);
171
172    if let Some(mut stdout) = child.stdout.take() {
173        use std::io::Read;
174        stdout.read_to_end(&mut stdout_buf).ok();
175    }
176    if let Some(mut stderr) = child.stderr.take() {
177        use std::io::Read;
178        stderr.read_to_end(&mut stderr_buf).ok();
179    }
180
181    Ok(RunResult {
182        wall_ms,
183        exit_code,
184        timed_out: false,
185        cpu_ms: None,
186        page_faults: None,
187        ctx_switches: None,
188        max_rss_kb,
189        io_read_bytes,
190        io_write_bytes,
191        network_packets: None,
192        energy_uj: None,
193        binary_bytes,
194        stdout: truncate(stdout_buf, spec.output_cap_bytes),
195        stderr: truncate(stderr_buf, spec.output_cap_bytes),
196    })
197}
198
199#[cfg(windows)]
200fn get_max_rss_windows(handle: std::os::windows::io::RawHandle) -> Option<u64> {
201    use windows::Win32::Foundation::HANDLE;
202    use windows::Win32::System::ProcessStatus::{GetProcessMemoryInfo, PROCESS_MEMORY_COUNTERS};
203
204    let mut counters = PROCESS_MEMORY_COUNTERS::default();
205    unsafe {
206        if GetProcessMemoryInfo(
207            HANDLE(handle as _),
208            &mut counters,
209            std::mem::size_of::<PROCESS_MEMORY_COUNTERS>() as u32,
210        )
211        .is_ok()
212        {
213            Some((counters.PeakWorkingSetSize / 1024) as u64)
214        } else {
215            None
216        }
217    }
218}
219
220#[cfg(windows)]
221fn get_io_counters_windows(handle: std::os::windows::io::RawHandle) -> (Option<u64>, Option<u64>) {
222    use windows::Win32::Foundation::HANDLE;
223    use windows::Win32::System::Threading::{GetProcessIoCounters, IO_COUNTERS};
224
225    let mut counters = IO_COUNTERS::default();
226    unsafe {
227        if GetProcessIoCounters(HANDLE(handle as _), &mut counters).is_ok() {
228            (
229                Some(counters.ReadTransferCount),
230                Some(counters.WriteTransferCount),
231            )
232        } else {
233            (None, None)
234        }
235    }
236}
237
238#[cfg(unix)]
239fn run_unix(spec: &CommandSpec) -> Result<RunResult, AdapterError> {
240    use std::os::unix::process::ExitStatusExt;
241    use std::process::{Command, Stdio};
242
243    let binary_bytes = binary_bytes_for_command(spec);
244    let mut cmd = Command::new(&spec.argv[0]);
245    if spec.argv.len() > 1 {
246        cmd.args(&spec.argv[1..]);
247    }
248
249    if let Some(cwd) = &spec.cwd {
250        cmd.current_dir(cwd);
251    }
252
253    for (k, v) in &spec.env {
254        cmd.env(k, v);
255    }
256
257    cmd.stdout(Stdio::piped());
258    cmd.stderr(Stdio::piped());
259
260    let mut usage_before = unsafe { std::mem::zeroed::<libc::rusage>() };
261    let _ = unsafe { libc::getrusage(libc::RUSAGE_CHILDREN, &mut usage_before) };
262
263    let start = Instant::now();
264
265    let out = if let Some(timeout) = spec.timeout {
266        let mut child = cmd.spawn().map_err(|e| AdapterError::RunCommand {
267            command: spec.argv.join(" "),
268            reason: e.to_string(),
269        })?;
270
271        match child
272            .wait_timeout(timeout)
273            .map_err(|e| AdapterError::Other(e.to_string()))?
274        {
275            Some(_) => child
276                .wait_with_output()
277                .map_err(|e| AdapterError::RunCommand {
278                    command: spec.argv.join(" "),
279                    reason: e.to_string(),
280                })?,
281            None => {
282                child.kill().ok();
283                return Err(AdapterError::Timeout);
284            }
285        }
286    } else {
287        cmd.output().map_err(|e| AdapterError::RunCommand {
288            command: spec.argv.join(" "),
289            reason: e.to_string(),
290        })?
291    };
292
293    let wall_ms = start.elapsed().as_millis() as u64;
294    let exit_code = out
295        .status
296        .code()
297        .or_else(|| out.status.signal())
298        .unwrap_or(-1);
299
300    let mut cpu_ms = None;
301    let mut max_rss_kb = None;
302    let mut page_faults = None;
303    let mut ctx_switches = None;
304
305    let mut usage_after = unsafe { std::mem::zeroed::<libc::rusage>() };
306    if unsafe { libc::getrusage(libc::RUSAGE_CHILDREN, &mut usage_after) } == 0 {
307        let user_ms = diff_timeval_ms(usage_after.ru_utime, usage_before.ru_utime);
308        let sys_ms = diff_timeval_ms(usage_after.ru_stime, usage_before.ru_stime);
309
310        cpu_ms = Some(user_ms.saturating_add(sys_ms));
311        max_rss_kb = Some(usage_after.ru_maxrss as u64);
312        page_faults =
313            Some((usage_after.ru_majflt as u64).saturating_sub(usage_before.ru_majflt as u64));
314        ctx_switches = Some(
315            (usage_after.ru_nvcsw as u64)
316                .saturating_sub(usage_before.ru_nvcsw as u64)
317                .saturating_add(
318                    (usage_after.ru_nivcsw as u64).saturating_sub(usage_before.ru_nivcsw as u64),
319                ),
320        );
321    }
322
323    Ok(RunResult {
324        wall_ms,
325        exit_code,
326        timed_out: false,
327        cpu_ms,
328        page_faults,
329        ctx_switches,
330        max_rss_kb,
331        io_read_bytes: None,
332        io_write_bytes: None,
333        network_packets: None,
334        energy_uj: None,
335        binary_bytes,
336        stdout: truncate(out.stdout, spec.output_cap_bytes),
337        stderr: truncate(out.stderr, spec.output_cap_bytes),
338    })
339}
340
341/// Standard process runner using std::process::Command.
342#[derive(Clone, Debug, Default)]
343pub struct StdProcessRunner;
344
345impl ProcessRunner for StdProcessRunner {
346    fn run(&self, spec: &CommandSpec) -> Result<RunResult, AdapterError> {
347        if spec.argv.is_empty() {
348            return Err(AdapterError::EmptyArgv);
349        }
350
351        #[cfg(windows)]
352        {
353            run_windows(spec)
354        }
355        #[cfg(unix)]
356        {
357            run_unix(spec)
358        }
359        #[cfg(all(not(unix), not(windows)))]
360        {
361            run_portable(spec)
362        }
363    }
364}
365
366/// Host fingerprinting and metadata collection.
367pub trait HostProbe {
368    fn probe(&self, options: &HostProbeOptions) -> perfgate_types::HostInfo;
369}
370
371#[derive(Debug, Clone, Default)]
372pub struct HostProbeOptions {
373    pub include_hostname_hash: bool,
374}
375
376#[derive(Clone, Debug, Default)]
377pub struct StdHostProbe;
378
379impl HostProbe for StdHostProbe {
380    fn probe(&self, options: &HostProbeOptions) -> perfgate_types::HostInfo {
381        let hostname_hash = if options.include_hostname_hash {
382            hostname::get()
383                .ok()
384                .map(|h| sha256_hex(h.to_string_lossy().as_bytes()))
385        } else {
386            None
387        };
388
389        perfgate_types::HostInfo {
390            os: std::env::consts::OS.to_string(),
391            arch: std::env::consts::ARCH.to_string(),
392            cpu_count: Some(num_cpus::get_physical() as u32),
393            memory_bytes: Some(get_total_memory()),
394            hostname_hash,
395        }
396    }
397}
398
399fn get_total_memory() -> u64 {
400    #[cfg(windows)]
401    {
402        use windows::Win32::System::SystemInformation::{GlobalMemoryStatusEx, MEMORYSTATUSEX};
403        let mut mem_status = MEMORYSTATUSEX {
404            dwLength: std::mem::size_of::<MEMORYSTATUSEX>() as u32,
405            ..Default::default()
406        };
407        unsafe {
408            if GlobalMemoryStatusEx(&mut mem_status).is_ok() {
409                mem_status.ullTotalPhys
410            } else {
411                0
412            }
413        }
414    }
415    #[cfg(unix)]
416    {
417        // Simple fallback for total memory on Unix
418        0
419    }
420    #[cfg(all(not(unix), not(windows)))]
421    {
422        0
423    }
424}
425
426fn binary_bytes_for_command(spec: &CommandSpec) -> Option<u64> {
427    spec.argv.first().and_then(|cmd| {
428        let path = Path::new(cmd);
429        if path.exists() {
430            std::fs::metadata(path).ok().map(|m| m.len())
431        } else {
432            // Try searching in PATH
433            which::which(cmd)
434                .ok()
435                .and_then(|p| std::fs::metadata(p).ok().map(|m| m.len()))
436        }
437    })
438}
439
440// Extension trait for Command to support timeout on Windows/Unix
441trait CommandTimeoutExt {
442    fn wait_timeout(
443        &mut self,
444        timeout: Duration,
445    ) -> std::io::Result<Option<std::process::ExitStatus>>;
446}
447
448#[cfg(windows)]
449impl CommandTimeoutExt for std::process::Child {
450    fn wait_timeout(
451        &mut self,
452        timeout: Duration,
453    ) -> std::io::Result<Option<std::process::ExitStatus>> {
454        use windows::Win32::Foundation::{HANDLE, WAIT_OBJECT_0, WAIT_TIMEOUT};
455        use windows::Win32::System::Threading::WaitForSingleObject;
456
457        let handle = self.as_raw_handle();
458        let millis = timeout.as_millis() as u32;
459
460        unsafe {
461            let res = WaitForSingleObject(HANDLE(handle as _), millis);
462            if res == WAIT_OBJECT_0 {
463                self.wait().map(Some)
464            } else if res == WAIT_TIMEOUT {
465                Ok(None)
466            } else {
467                Err(std::io::Error::last_os_error())
468            }
469        }
470    }
471}
472
473#[cfg(unix)]
474impl CommandTimeoutExt for std::process::Child {
475    fn wait_timeout(
476        &mut self,
477        timeout: Duration,
478    ) -> std::io::Result<Option<std::process::ExitStatus>> {
479        let start = Instant::now();
480        while start.elapsed() < timeout {
481            if let Some(status) = self.try_wait()? {
482                return Ok(Some(status));
483            }
484            std::thread::sleep(Duration::from_millis(10));
485        }
486        Ok(None)
487    }
488}
489
490#[cfg(unix)]
491fn diff_timeval_ms(after: libc::timeval, before: libc::timeval) -> u64 {
492    #[allow(clippy::unnecessary_cast)]
493    let mut sec = after.tv_sec as i64 - before.tv_sec as i64;
494    #[allow(clippy::unnecessary_cast)]
495    let mut usec = after.tv_usec as i64 - before.tv_usec as i64;
496
497    if usec < 0 {
498        sec -= 1;
499        usec += 1_000_000;
500    }
501
502    // Ensure we don't underflow if rusage somehow goes backwards (unlikely)
503    if sec < 0 {
504        return 0;
505    }
506
507    (sec as u64) * 1000 + (usec as u64) / 1000
508}
509
510#[cfg(test)]
511mod tests {
512    use super::*;
513
514    #[test]
515    fn truncate_works() {
516        let data = vec![1, 2, 3, 4, 5];
517        assert_eq!(truncate(data.clone(), 3), vec![1, 2, 3]);
518        assert_eq!(truncate(data.clone(), 10), data);
519        assert_eq!(truncate(data.clone(), 0), data);
520    }
521
522    /// read_with_cap truncates to cap.
523    #[test]
524    fn read_with_cap_truncates() {
525        fn read_with_cap<R: std::io::Read>(reader: &mut R, cap: usize) -> Vec<u8> {
526            let mut buf = vec![0u8; cap];
527            let n = reader.read(&mut buf).unwrap();
528            buf.truncate(n);
529            buf
530        }
531
532        let mut reader: &[u8] = b"hello world";
533        let result = read_with_cap(&mut reader, 5);
534        assert_eq!(result, b"hello");
535    }
536}