synwire_agent/sandbox/
pipeline.rs1use std::time::Duration;
4
5const MAX_STAGE_OUTPUT_BYTES: usize = 1024 * 1024; use serde::{Deserialize, Serialize};
9use synwire_core::BoxFuture;
10use synwire_core::sandbox::PipelineStage;
11use synwire_core::vfs::error::VfsError;
12use synwire_core::vfs::types::ExecuteResponse;
13use tokio::io::AsyncWriteExt;
14use tokio::process::Command;
15use tokio::time::timeout;
16
17#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct PipelineResult {
20 pub stages: Vec<ExecuteResponse>,
22 pub exit_code: i32,
24}
25
26#[derive(Debug, Default, Clone)]
28pub struct PipelineExecutor;
29
30impl PipelineExecutor {
31 #[must_use]
33 pub const fn new() -> Self {
34 Self
35 }
36
37 pub fn execute<'a>(
39 &'a self,
40 stages: &'a [PipelineStage],
41 default_timeout: Duration,
42 ) -> BoxFuture<'a, Result<PipelineResult, VfsError>> {
43 Box::pin(async move {
44 if stages.is_empty() {
45 return Ok(PipelineResult {
46 stages: Vec::new(),
47 exit_code: 0,
48 });
49 }
50
51 let mut responses = Vec::new();
52 let mut stdin_data: Option<Vec<u8>> = None;
53
54 for stage in stages {
55 let stage_timeout = stage
56 .timeout_secs
57 .map_or(default_timeout, Duration::from_secs);
58
59 let mut cmd = Command::new(&stage.command);
60 let _ = cmd
61 .args(&stage.args)
62 .stdin(std::process::Stdio::piped())
63 .stdout(std::process::Stdio::piped())
64 .stderr(std::process::Stdio::piped());
65
66 let mut child = cmd.spawn().map_err(VfsError::Io)?;
67
68 if let Some(data) = stdin_data.take() {
70 if let Some(mut sin) = child.stdin.take() {
71 sin.write_all(&data).await.map_err(VfsError::Io)?;
72 drop(sin);
73 }
74 } else {
75 drop(child.stdin.take());
76 }
77
78 let output = timeout(stage_timeout, child.wait_with_output())
79 .await
80 .map_err(|_| {
81 VfsError::Timeout(format!(
82 "{} timed out after {stage_timeout:?}",
83 stage.command
84 ))
85 })?
86 .map_err(VfsError::Io)?;
87
88 let mut stdout = output.stdout;
89 let stderr = if stage.stderr_to_stdout {
90 stdout.extend_from_slice(&output.stderr);
91 String::new()
92 } else {
93 String::from_utf8_lossy(&output.stderr).into_owned()
94 };
95 stdout.truncate(MAX_STAGE_OUTPUT_BYTES);
97
98 let resp = ExecuteResponse {
99 exit_code: output.status.code().unwrap_or(-1),
100 stdout: String::from_utf8_lossy(&stdout).into_owned(),
101 stderr,
102 };
103
104 if resp.exit_code != 0 {
106 let exit_code = resp.exit_code;
107 responses.push(resp);
108 return Ok(PipelineResult {
109 exit_code,
110 stages: responses,
111 });
112 }
113
114 stdin_data = Some(stdout);
115 responses.push(resp);
116 }
117
118 let exit_code = responses.last().map_or(0, |r| r.exit_code);
119
120 Ok(PipelineResult {
121 stages: responses,
122 exit_code,
123 })
124 })
125 }
126
127 pub fn execute_to_file<'a>(
129 &'a self,
130 stages: &'a [PipelineStage],
131 output_file: &'a str,
132 default_timeout: Duration,
133 ) -> BoxFuture<'a, Result<PipelineResult, VfsError>> {
134 Box::pin(async move {
135 let result = self.execute(stages, default_timeout).await?;
136 if let Some(last) = result.stages.last() {
137 tokio::fs::write(output_file, last.stdout.as_bytes())
138 .await
139 .map_err(VfsError::Io)?;
140 }
141 Ok(result)
142 })
143 }
144}
145
146#[cfg(test)]
147#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
148mod tests {
149 use super::*;
150
151 fn stage(cmd: &str, args: &[&str]) -> PipelineStage {
152 PipelineStage {
153 command: cmd.to_string(),
154 args: args.iter().map(ToString::to_string).collect(),
155 stderr_to_stdout: false,
156 timeout_secs: None,
157 }
158 }
159
160 #[tokio::test]
161 async fn test_simple_pipeline() {
162 let executor = PipelineExecutor::new();
163 let stages = vec![stage("echo", &["hello world"]), stage("grep", &["hello"])];
164 let result = executor
165 .execute(&stages, Duration::from_secs(5))
166 .await
167 .expect("pipeline");
168 assert_eq!(result.exit_code, 0);
169 assert!(result.stages[1].stdout.contains("hello"));
170 }
171
172 #[tokio::test]
173 async fn test_pipeline_stage_failure_stops() {
174 let executor = PipelineExecutor::new();
175 let stages = vec![
176 stage("false", &[]), stage("echo", &["should not run"]),
178 ];
179 let result = executor
180 .execute(&stages, Duration::from_secs(5))
181 .await
182 .expect("pipeline");
183 assert_ne!(result.exit_code, 0);
184 assert_eq!(result.stages.len(), 1);
185 }
186
187 #[tokio::test]
188 async fn test_stderr_to_stdout_combined() {
189 let executor = PipelineExecutor::new();
190 let stages = vec![PipelineStage {
191 command: "sh".to_string(),
192 args: vec!["-c".to_string(), "echo out; echo err >&2".to_string()],
193 stderr_to_stdout: true,
194 timeout_secs: None,
195 }];
196 let result = executor
197 .execute(&stages, Duration::from_secs(5))
198 .await
199 .expect("pipeline");
200 assert_eq!(result.exit_code, 0);
201 assert!(result.stages[0].stdout.contains("out"));
202 }
203
204 #[tokio::test]
205 async fn test_per_stage_timeout() {
206 let executor = PipelineExecutor::new();
207 let stages = vec![PipelineStage {
208 command: "sleep".to_string(),
209 args: vec!["10".to_string()],
210 stderr_to_stdout: false,
211 timeout_secs: Some(1),
212 }];
213 let err = executor
214 .execute(&stages, Duration::from_secs(30))
215 .await
216 .expect_err("should timeout");
217 assert!(matches!(err, VfsError::Timeout(_)));
218 }
219}