Skip to main content

imp_core/workflow/
verification_runner.rs

1use std::path::{Path, PathBuf};
2use std::process::Stdio;
3use std::time::{Duration, Instant};
4
5use tokio::io::AsyncReadExt;
6use tokio::process::Command;
7
8use super::{
9    VerificationArtifactRef, VerificationCommand, VerificationGate, VerificationGateKind,
10    VerificationGateResult,
11};
12use crate::error::{Error, Result};
13
14const DEFAULT_TIMEOUT: Duration = Duration::from_secs(120);
15const MAX_CAPTURE_BYTES: usize = 64 * 1024;
16
17#[derive(Debug, Clone)]
18pub struct VerificationGateRunner {
19    cwd: PathBuf,
20    artifact_root: PathBuf,
21    default_timeout: Duration,
22    max_capture_bytes: usize,
23}
24
25impl VerificationGateRunner {
26    pub fn new(cwd: impl Into<PathBuf>, artifact_root: impl Into<PathBuf>) -> Self {
27        Self {
28            cwd: cwd.into(),
29            artifact_root: artifact_root.into(),
30            default_timeout: DEFAULT_TIMEOUT,
31            max_capture_bytes: MAX_CAPTURE_BYTES,
32        }
33    }
34
35    pub fn with_default_timeout(mut self, timeout: Duration) -> Self {
36        self.default_timeout = timeout;
37        self
38    }
39
40    pub fn with_max_capture_bytes(mut self, bytes: usize) -> Self {
41        self.max_capture_bytes = bytes;
42        self
43    }
44
45    pub async fn run(&self, gate: &mut VerificationGate) -> Result<VerificationGateResult> {
46        let Some(command) = gate.command.clone() else {
47            gate.mark_blocked("verification gate has no command");
48            return Ok(VerificationGateResult {
49                summary: Some("verification gate has no command".into()),
50                ..VerificationGateResult::default()
51            });
52        };
53        if gate.kind != VerificationGateKind::Command {
54            gate.mark_blocked("only command verification gates are executable today");
55            return Ok(VerificationGateResult {
56                summary: Some("only command verification gates are executable today".into()),
57                ..VerificationGateResult::default()
58            });
59        }
60
61        gate.mark_running();
62        let started = Instant::now();
63        let cwd = command.cwd.clone().unwrap_or_else(|| self.cwd.clone());
64        let timeout = command.timeout.unwrap_or(self.default_timeout);
65        let gate_dir = self.artifact_root.join(sanitize_path_segment(&gate.id));
66        tokio::fs::create_dir_all(&gate_dir)
67            .await
68            .map_err(Error::Io)?;
69
70        let mut child = Command::new("/bin/sh")
71            .arg("-lc")
72            .arg(&command.command)
73            .current_dir(&cwd)
74            .stdin(Stdio::null())
75            .stdout(Stdio::piped())
76            .stderr(Stdio::piped())
77            .kill_on_drop(true)
78            .spawn()
79            .map_err(Error::Io)?;
80
81        let mut stdout = child.stdout.take().expect("stdout piped");
82        let mut stderr = child.stderr.take().expect("stderr piped");
83        let stdout_task = tokio::spawn(async move {
84            let mut bytes = Vec::new();
85            stdout.read_to_end(&mut bytes).await.map(|_| bytes)
86        });
87        let stderr_task = tokio::spawn(async move {
88            let mut bytes = Vec::new();
89            stderr.read_to_end(&mut bytes).await.map(|_| bytes)
90        });
91
92        let status = match tokio::time::timeout(timeout, child.wait()).await {
93            Ok(wait) => wait.map_err(Error::Io)?,
94            Err(_) => {
95                let _ = child.kill().await;
96                let stdout_bytes = join_output(stdout_task).await;
97                let stderr_bytes = join_output(stderr_task).await;
98                let result = self
99                    .write_artifacts(
100                        gate,
101                        &command,
102                        &cwd,
103                        started.elapsed(),
104                        None,
105                        stdout_bytes,
106                        stderr_bytes,
107                        &gate_dir,
108                        Some(format!(
109                            "verification command timed out after {}ms",
110                            timeout.as_millis()
111                        )),
112                    )
113                    .await?;
114                gate.mark_blocked(
115                    result
116                        .summary
117                        .clone()
118                        .unwrap_or_else(|| "verification command timed out".into()),
119                );
120                return Ok(result);
121            }
122        };
123
124        let stdout_bytes = join_output(stdout_task).await;
125        let stderr_bytes = join_output(stderr_task).await;
126        let exit_code = status.code();
127        let result = self
128            .write_artifacts(
129                gate,
130                &command,
131                &cwd,
132                started.elapsed(),
133                exit_code,
134                stdout_bytes,
135                stderr_bytes,
136                &gate_dir,
137                None,
138            )
139            .await?;
140
141        match exit_code {
142            Some(0) => gate.mark_passed(result.clone()),
143            _ => gate.mark_failed(result.clone()),
144        }
145        Ok(result)
146    }
147
148    #[allow(clippy::too_many_arguments)]
149    async fn write_artifacts(
150        &self,
151        gate: &mut VerificationGate,
152        command: &VerificationCommand,
153        cwd: &Path,
154        elapsed: Duration,
155        exit_code: Option<i32>,
156        stdout_bytes: Vec<u8>,
157        stderr_bytes: Vec<u8>,
158        gate_dir: &Path,
159        blocked_summary: Option<String>,
160    ) -> Result<VerificationGateResult> {
161        let stdout_capture = CapturedOutput::new(stdout_bytes, self.max_capture_bytes);
162        let stderr_capture = CapturedOutput::new(stderr_bytes, self.max_capture_bytes);
163        let stdout_path = gate_dir.join("stdout.log");
164        let stderr_path = gate_dir.join("stderr.log");
165        let status_path = gate_dir.join("status.json");
166
167        tokio::fs::write(&stdout_path, stdout_capture.content.as_bytes())
168            .await
169            .map_err(Error::Io)?;
170        tokio::fs::write(&stderr_path, stderr_capture.content.as_bytes())
171            .await
172            .map_err(Error::Io)?;
173
174        let summary = blocked_summary.unwrap_or_else(|| match exit_code {
175            Some(0) => "verification command passed".to_string(),
176            Some(code) => format!("verification command failed with exit code {code}"),
177            None => "verification command terminated without exit code".to_string(),
178        });
179
180        let result = VerificationGateResult {
181            exit_code,
182            duration_ms: Some(elapsed.as_millis() as u64),
183            summary: Some(summary),
184            stdout_summary: Some(stdout_capture.summary()),
185            stderr_summary: Some(stderr_capture.summary()),
186        };
187
188        let status_json = serde_json::json!({
189            "gate_id": gate.id,
190            "command": command.command,
191            "cwd": cwd,
192            "exit_code": exit_code,
193            "duration_ms": result.duration_ms,
194            "summary": result.summary,
195            "stdout_truncated": stdout_capture.truncated,
196            "stderr_truncated": stderr_capture.truncated,
197        });
198        tokio::fs::write(
199            &status_path,
200            serde_json::to_vec_pretty(&status_json).map_err(Error::Json)?,
201        )
202        .await
203        .map_err(Error::Io)?;
204
205        gate.artifacts = vec![
206            artifact_ref(
207                "stdout",
208                stdout_path,
209                stdout_capture.original_len,
210                stdout_capture.truncated,
211            ),
212            artifact_ref(
213                "stderr",
214                stderr_path,
215                stderr_capture.original_len,
216                stderr_capture.truncated,
217            ),
218            artifact_ref("status", status_path, None, false),
219        ];
220        Ok(result)
221    }
222}
223
224fn artifact_ref(
225    kind: &str,
226    path: PathBuf,
227    bytes: Option<usize>,
228    truncated: bool,
229) -> VerificationArtifactRef {
230    let mut artifact = VerificationArtifactRef::new(kind, path);
231    artifact.bytes = bytes.map(|bytes| bytes as u64);
232    if truncated {
233        artifact.redaction = Some("output truncated".into());
234    }
235    artifact
236}
237
238async fn join_output(task: tokio::task::JoinHandle<std::io::Result<Vec<u8>>>) -> Vec<u8> {
239    match task.await {
240        Ok(Ok(bytes)) => bytes,
241        _ => Vec::new(),
242    }
243}
244
245fn sanitize_path_segment(input: &str) -> String {
246    let sanitized: String = input
247        .chars()
248        .map(|ch| {
249            if ch.is_ascii_alphanumeric() || ch == '-' || ch == '_' {
250                ch
251            } else {
252                '-'
253            }
254        })
255        .collect();
256    if sanitized.is_empty() {
257        "gate".into()
258    } else {
259        sanitized
260    }
261}
262
263struct CapturedOutput {
264    content: String,
265    original_len: Option<usize>,
266    truncated: bool,
267}
268
269impl CapturedOutput {
270    fn new(bytes: Vec<u8>, max_bytes: usize) -> Self {
271        let original_len = bytes.len();
272        let truncated = original_len > max_bytes;
273        let slice = if truncated {
274            &bytes[..max_bytes]
275        } else {
276            &bytes[..]
277        };
278        let mut content = String::from_utf8_lossy(slice).to_string();
279        if truncated {
280            content.push_str("\n[verification output truncated]\n");
281        }
282        Self {
283            content,
284            original_len: Some(original_len),
285            truncated,
286        }
287    }
288
289    fn summary(&self) -> String {
290        let trimmed = self.content.trim();
291        if trimmed.is_empty() {
292            return "<empty>".into();
293        }
294        let mut summary: String = trimmed.chars().take(500).collect();
295        if trimmed.chars().count() > 500 {
296            summary.push('…');
297        }
298        summary
299    }
300}
301
302#[cfg(test)]
303mod tests {
304    use super::*;
305    use crate::workflow::VerificationGateStatus;
306
307    #[tokio::test]
308    async fn command_gate_runner_passes_and_writes_artifacts() {
309        let temp = tempfile::TempDir::new().unwrap();
310        let runner = VerificationGateRunner::new(temp.path(), temp.path().join("artifacts"));
311        let mut gate = VerificationGate::command("pass", "printf 'hello' && printf 'warn' >&2");
312
313        let result = runner.run(&mut gate).await.unwrap();
314
315        assert_eq!(gate.status, VerificationGateStatus::Passed);
316        assert_eq!(result.exit_code, Some(0));
317        assert_eq!(result.stdout_summary.as_deref(), Some("hello"));
318        assert_eq!(result.stderr_summary.as_deref(), Some("warn"));
319        assert!(gate
320            .artifacts
321            .iter()
322            .any(|artifact| artifact.kind == "stdout"));
323        assert_eq!(
324            std::fs::read_to_string(temp.path().join("artifacts/pass/stdout.log")).unwrap(),
325            "hello"
326        );
327        assert!(temp.path().join("artifacts/pass/status.json").exists());
328    }
329
330    #[tokio::test]
331    async fn command_gate_runner_marks_failed_command() {
332        let temp = tempfile::TempDir::new().unwrap();
333        let runner = VerificationGateRunner::new(temp.path(), temp.path().join("artifacts"));
334        let mut gate = VerificationGate::command("fail", "printf 'bad' >&2; exit 7");
335
336        let result = runner.run(&mut gate).await.unwrap();
337
338        assert_eq!(gate.status, VerificationGateStatus::Failed);
339        assert_eq!(result.exit_code, Some(7));
340        assert!(result.summary.unwrap().contains("exit code 7"));
341        assert_eq!(
342            std::fs::read_to_string(temp.path().join("artifacts/fail/stderr.log")).unwrap(),
343            "bad"
344        );
345    }
346
347    #[tokio::test]
348    async fn command_gate_runner_marks_timeout_blocked() {
349        let temp = tempfile::TempDir::new().unwrap();
350        let runner = VerificationGateRunner::new(temp.path(), temp.path().join("artifacts"))
351            .with_default_timeout(Duration::from_millis(50));
352        let mut gate = VerificationGate::command("timeout", "sleep 2");
353
354        let result = runner.run(&mut gate).await.unwrap();
355
356        assert_eq!(gate.status, VerificationGateStatus::Blocked);
357        assert_eq!(result.exit_code, None);
358        assert!(result.summary.unwrap().contains("timed out"));
359        assert!(temp.path().join("artifacts/timeout/status.json").exists());
360    }
361
362    #[tokio::test]
363    async fn command_gate_runner_truncates_large_output() {
364        let temp = tempfile::TempDir::new().unwrap();
365        let runner = VerificationGateRunner::new(temp.path(), temp.path().join("artifacts"))
366            .with_max_capture_bytes(5);
367        let mut gate = VerificationGate::command("truncate", "printf 'abcdefghijklmnopqrstuvwxyz'");
368
369        let result = runner.run(&mut gate).await.unwrap();
370
371        assert_eq!(gate.status, VerificationGateStatus::Passed);
372        assert!(result.stdout_summary.unwrap().contains("truncated"));
373        let stdout =
374            std::fs::read_to_string(temp.path().join("artifacts/truncate/stdout.log")).unwrap();
375        assert!(stdout.starts_with("abcde"));
376        assert!(stdout.contains("truncated"));
377        assert!(gate
378            .artifacts
379            .iter()
380            .any(|artifact| artifact.kind == "stdout"
381                && artifact.redaction.as_deref() == Some("output truncated")));
382    }
383}