use anyhow::{Context, Result, bail};
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::time::Instant;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Pipeline {
pub step: Vec<PipelineStep>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PipelineStep {
pub name: String,
pub image: String,
pub command: String,
#[serde(default)]
pub input: Option<String>,
#[serde(default)]
pub output: Option<String>,
}
#[derive(Debug)]
#[allow(dead_code)]
pub struct StepResult {
pub name: String,
pub duration: std::time::Duration,
pub success: bool,
pub output: String,
}
pub fn load(path: &Path) -> Result<Pipeline> {
let content = std::fs::read_to_string(path)
.with_context(|| format!("Failed to read {}", path.display()))?;
let pipeline: Pipeline =
toml::from_str(&content).with_context(|| format!("Failed to parse {}", path.display()))?;
if pipeline.step.is_empty() {
bail!("Pipeline has no steps");
}
let mut seen = std::collections::HashSet::new();
for step in &pipeline.step {
if !seen.insert(&step.name) {
bail!("Duplicate step name: '{}'", step.name);
}
if step.name.contains('/')
|| step.name.contains('\\')
|| step.name.contains("..")
|| step.name.is_empty()
{
bail!(
"Invalid step name '{}': must not contain '/', '\\', or '..'",
step.name
);
}
}
Ok(pipeline)
}
pub async fn run(
pipeline: &Pipeline,
manager: &mut crate::vmm::VmManager,
prefix: &str,
) -> Result<Vec<StepResult>> {
let total_steps = pipeline.step.len();
let mut results = Vec::new();
let total_start = Instant::now();
let mut prev_output_host: Option<PathBuf> = None;
let mut temp_dirs: Vec<PathBuf> = Vec::new();
for (i, step) in pipeline.step.iter().enumerate() {
let step_num = i + 1;
let sandbox_name = format!("{}-{}", prefix, step.name);
eprint!(" [{}/{}] {:<20}", step_num, total_steps, step.name);
let step_start = Instant::now();
manager.create(&sandbox_name, &step.image, 1, 512).await?;
manager.start(&sandbox_name).await?;
if let (Some(input_dir), Some(host_output)) = (&step.input, &prev_output_host) {
if host_output.exists() {
for entry in std::fs::read_dir(host_output)? {
let entry = entry?;
if entry.file_type()?.is_file() {
let content = std::fs::read(entry.path())?;
let dest = format!(
"{}/{}",
input_dir.trim_end_matches('/'),
entry.file_name().to_string_lossy()
);
let mkdir_cmd =
vec!["mkdir".to_string(), "-p".to_string(), input_dir.clone()];
let _ = manager.exec_cmd(&sandbox_name, &mkdir_cmd).await;
manager.write_file(&sandbox_name, &dest, &content).await?;
}
}
}
}
let cmd: Vec<String> = step.command.split_whitespace().map(String::from).collect();
let exec_result = manager.exec_cmd(&sandbox_name, &cmd).await;
let success = exec_result.is_ok();
let output = match exec_result {
Ok(out) => out,
Err(e) => format!("Error: {}", e),
};
if let Some(ref output_dir) = step.output {
let host_dir =
std::env::temp_dir().join(format!("agentkernel-pipeline-{}", sandbox_name));
let _ = std::fs::create_dir_all(&host_dir);
let ls_cmd = vec!["ls".to_string(), "-1".to_string(), output_dir.clone()];
if let Ok(ls_output) = manager.exec_cmd(&sandbox_name, &ls_cmd).await {
for filename in ls_output.lines().filter(|l| !l.is_empty()) {
let src = format!("{}/{}", output_dir.trim_end_matches('/'), filename);
if let Ok(content) = manager.read_file(&sandbox_name, &src).await {
let _ = std::fs::write(host_dir.join(filename), content);
}
}
}
temp_dirs.push(host_dir.clone());
prev_output_host = Some(host_dir);
}
let duration = step_start.elapsed();
let _ = manager.stop(&sandbox_name).await;
let _ = manager.remove(&sandbox_name).await;
if success {
eprintln!(" done ({:.1}s)", duration.as_secs_f64());
} else {
eprintln!(" FAILED ({:.1}s)", duration.as_secs_f64());
}
results.push(StepResult {
name: step.name.clone(),
duration,
success,
output,
});
if !success {
bail!(
"Pipeline failed at step '{}' ({}/{})",
step.name,
step_num,
total_steps
);
}
}
let total_duration = total_start.elapsed();
eprintln!(" Done ({:.1}s total)", total_duration.as_secs_f64());
for dir in &temp_dirs {
let _ = std::fs::remove_dir_all(dir);
}
Ok(results)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_load_pipeline() {
let dir = tempfile::TempDir::new().unwrap();
let path = dir.path().join("pipeline.toml");
std::fs::write(
&path,
r#"
[[step]]
name = "generate"
image = "python:3.12-alpine"
command = "python generate.py"
output = "/app/output/"
[[step]]
name = "process"
image = "node:22-alpine"
command = "node process.js"
input = "/app/input/"
output = "/app/results/"
[[step]]
name = "analyze"
image = "python:3.12-alpine"
command = "python analyze.py"
input = "/app/input/"
"#,
)
.unwrap();
let pipeline = load(&path).unwrap();
assert_eq!(pipeline.step.len(), 3);
assert_eq!(pipeline.step[0].name, "generate");
assert_eq!(pipeline.step[0].image, "python:3.12-alpine");
assert_eq!(pipeline.step[1].input, Some("/app/input/".to_string()));
assert_eq!(pipeline.step[1].output, Some("/app/results/".to_string()));
assert!(pipeline.step[2].output.is_none());
}
#[test]
fn test_load_empty_pipeline() {
let dir = tempfile::TempDir::new().unwrap();
let path = dir.path().join("empty.toml");
std::fs::write(&path, "step = []\n").unwrap();
let result = load(&path);
assert!(result.is_err());
}
#[test]
fn test_load_duplicate_names() {
let dir = tempfile::TempDir::new().unwrap();
let path = dir.path().join("dup.toml");
std::fs::write(
&path,
r#"
[[step]]
name = "step1"
image = "alpine"
command = "echo a"
[[step]]
name = "step1"
image = "alpine"
command = "echo b"
"#,
)
.unwrap();
let result = load(&path);
assert!(result.is_err());
}
#[test]
fn test_load_missing_file() {
let result = load(Path::new("/nonexistent/pipeline.toml"));
assert!(result.is_err());
}
#[test]
fn test_load_invalid_step_name_path_traversal() {
let dir = tempfile::TempDir::new().unwrap();
let path = dir.path().join("bad.toml");
std::fs::write(
&path,
r#"
[[step]]
name = "../escape"
image = "alpine"
command = "echo bad"
"#,
)
.unwrap();
let result = load(&path);
assert!(result.is_err());
assert!(
result
.unwrap_err()
.to_string()
.contains("Invalid step name")
);
}
#[test]
fn test_load_invalid_step_name_slash() {
let dir = tempfile::TempDir::new().unwrap();
let path = dir.path().join("slash.toml");
std::fs::write(
&path,
r#"
[[step]]
name = "step/one"
image = "alpine"
command = "echo bad"
"#,
)
.unwrap();
let result = load(&path);
assert!(result.is_err());
}
#[test]
fn test_pipeline_step_serde() {
let step = PipelineStep {
name: "test".to_string(),
image: "alpine:3.20".to_string(),
command: "echo hello".to_string(),
input: Some("/in".to_string()),
output: Some("/out".to_string()),
};
let json = serde_json::to_string(&step).unwrap();
let parsed: PipelineStep = serde_json::from_str(&json).unwrap();
assert_eq!(parsed.name, "test");
assert_eq!(parsed.input, Some("/in".to_string()));
}
#[test]
fn test_pipeline_step_defaults() {
let toml_str = r#"
[[step]]
name = "simple"
image = "alpine"
command = "echo hi"
"#;
let pipeline: Pipeline = toml::from_str(toml_str).unwrap();
assert!(pipeline.step[0].input.is_none());
assert!(pipeline.step[0].output.is_none());
}
#[test]
fn test_step_result() {
let result = StepResult {
name: "test".to_string(),
duration: std::time::Duration::from_millis(1500),
success: true,
output: "hello".to_string(),
};
assert!(result.success);
assert_eq!(result.duration.as_millis(), 1500);
}
}