Skip to main content

synwire_agent/sandbox/
pipeline.rs

1//! Multi-stage pipeline executor.
2
3use std::time::Duration;
4
5/// Maximum bytes of stdout buffered between pipeline stages.
6const MAX_STAGE_OUTPUT_BYTES: usize = 1024 * 1024; // 1 MiB
7
8use serde::{Deserialize, Serialize};
9use synwire_core::BoxFuture;
10use synwire_core::sandbox::PipelineStage;
11use synwire_core::vfs::error::VfsError;
12use synwire_core::vfs::types::ExecuteResponse;
13use tokio::io::AsyncWriteExt;
14use tokio::process::Command;
15use tokio::time::timeout;
16
17/// Result of a pipeline execution.
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct PipelineResult {
20    /// Per-stage responses.
21    pub stages: Vec<ExecuteResponse>,
22    /// Final combined exit code.
23    pub exit_code: i32,
24}
25
26/// Executes multi-stage command pipelines.
27#[derive(Debug, Default, Clone)]
28pub struct PipelineExecutor;
29
30impl PipelineExecutor {
31    /// Create a new pipeline executor.
32    #[must_use]
33    pub const fn new() -> Self {
34        Self
35    }
36
37    /// Execute a pipeline: each stage's stdout is piped to the next stage's stdin.
38    pub fn execute<'a>(
39        &'a self,
40        stages: &'a [PipelineStage],
41        default_timeout: Duration,
42    ) -> BoxFuture<'a, Result<PipelineResult, VfsError>> {
43        Box::pin(async move {
44            if stages.is_empty() {
45                return Ok(PipelineResult {
46                    stages: Vec::new(),
47                    exit_code: 0,
48                });
49            }
50
51            let mut responses = Vec::new();
52            let mut stdin_data: Option<Vec<u8>> = None;
53
54            for stage in stages {
55                let stage_timeout = stage
56                    .timeout_secs
57                    .map_or(default_timeout, Duration::from_secs);
58
59                let mut cmd = Command::new(&stage.command);
60                let _ = cmd
61                    .args(&stage.args)
62                    .stdin(std::process::Stdio::piped())
63                    .stdout(std::process::Stdio::piped())
64                    .stderr(std::process::Stdio::piped());
65
66                let mut child = cmd.spawn().map_err(VfsError::Io)?;
67
68                // Feed previous stage's output as stdin.
69                if let Some(data) = stdin_data.take() {
70                    if let Some(mut sin) = child.stdin.take() {
71                        sin.write_all(&data).await.map_err(VfsError::Io)?;
72                        drop(sin);
73                    }
74                } else {
75                    drop(child.stdin.take());
76                }
77
78                let output = timeout(stage_timeout, child.wait_with_output())
79                    .await
80                    .map_err(|_| {
81                        VfsError::Timeout(format!(
82                            "{} timed out after {stage_timeout:?}",
83                            stage.command
84                        ))
85                    })?
86                    .map_err(VfsError::Io)?;
87
88                let mut stdout = output.stdout;
89                let stderr = if stage.stderr_to_stdout {
90                    stdout.extend_from_slice(&output.stderr);
91                    String::new()
92                } else {
93                    String::from_utf8_lossy(&output.stderr).into_owned()
94                };
95                // Cap buffered output to avoid excessive memory use between stages.
96                stdout.truncate(MAX_STAGE_OUTPUT_BYTES);
97
98                let resp = ExecuteResponse {
99                    exit_code: output.status.code().unwrap_or(-1),
100                    stdout: String::from_utf8_lossy(&stdout).into_owned(),
101                    stderr,
102                };
103
104                // If stage failed, stop the pipeline.
105                if resp.exit_code != 0 {
106                    let exit_code = resp.exit_code;
107                    responses.push(resp);
108                    return Ok(PipelineResult {
109                        exit_code,
110                        stages: responses,
111                    });
112                }
113
114                stdin_data = Some(stdout);
115                responses.push(resp);
116            }
117
118            let exit_code = responses.last().map_or(0, |r| r.exit_code);
119
120            Ok(PipelineResult {
121                stages: responses,
122                exit_code,
123            })
124        })
125    }
126
127    /// Redirect final stage stdout to a file.
128    pub fn execute_to_file<'a>(
129        &'a self,
130        stages: &'a [PipelineStage],
131        output_file: &'a str,
132        default_timeout: Duration,
133    ) -> BoxFuture<'a, Result<PipelineResult, VfsError>> {
134        Box::pin(async move {
135            let result = self.execute(stages, default_timeout).await?;
136            if let Some(last) = result.stages.last() {
137                tokio::fs::write(output_file, last.stdout.as_bytes())
138                    .await
139                    .map_err(VfsError::Io)?;
140            }
141            Ok(result)
142        })
143    }
144}
145
146#[cfg(test)]
147#[allow(clippy::unwrap_used, clippy::expect_used, clippy::panic)]
148mod tests {
149    use super::*;
150
151    fn stage(cmd: &str, args: &[&str]) -> PipelineStage {
152        PipelineStage {
153            command: cmd.to_string(),
154            args: args.iter().map(ToString::to_string).collect(),
155            stderr_to_stdout: false,
156            timeout_secs: None,
157        }
158    }
159
160    #[tokio::test]
161    async fn test_simple_pipeline() {
162        let executor = PipelineExecutor::new();
163        let stages = vec![stage("echo", &["hello world"]), stage("grep", &["hello"])];
164        let result = executor
165            .execute(&stages, Duration::from_secs(5))
166            .await
167            .expect("pipeline");
168        assert_eq!(result.exit_code, 0);
169        assert!(result.stages[1].stdout.contains("hello"));
170    }
171
172    #[tokio::test]
173    async fn test_pipeline_stage_failure_stops() {
174        let executor = PipelineExecutor::new();
175        let stages = vec![
176            stage("false", &[]), // always fails
177            stage("echo", &["should not run"]),
178        ];
179        let result = executor
180            .execute(&stages, Duration::from_secs(5))
181            .await
182            .expect("pipeline");
183        assert_ne!(result.exit_code, 0);
184        assert_eq!(result.stages.len(), 1);
185    }
186
187    #[tokio::test]
188    async fn test_stderr_to_stdout_combined() {
189        let executor = PipelineExecutor::new();
190        let stages = vec![PipelineStage {
191            command: "sh".to_string(),
192            args: vec!["-c".to_string(), "echo out; echo err >&2".to_string()],
193            stderr_to_stdout: true,
194            timeout_secs: None,
195        }];
196        let result = executor
197            .execute(&stages, Duration::from_secs(5))
198            .await
199            .expect("pipeline");
200        assert_eq!(result.exit_code, 0);
201        assert!(result.stages[0].stdout.contains("out"));
202    }
203
204    #[tokio::test]
205    async fn test_per_stage_timeout() {
206        let executor = PipelineExecutor::new();
207        let stages = vec![PipelineStage {
208            command: "sleep".to_string(),
209            args: vec!["10".to_string()],
210            stderr_to_stdout: false,
211            timeout_secs: Some(1),
212        }];
213        let err = executor
214            .execute(&stages, Duration::from_secs(30))
215            .await
216            .expect_err("should timeout");
217        assert!(matches!(err, VfsError::Timeout(_)));
218    }
219}