use std::process::Command;
fn run_run_dot(args: &[&str]) -> std::process::Output {
run_run_dot_with_env(args, &[])
}
fn run_run_dot_with_env(args: &[&str], env_add: &[(&str, &str)]) -> std::process::Output {
let mut cmd = Command::new("cargo");
cmd.args(["run", "--bin", "run_dot", "--"]).args(args);
for (k, v) in env_add {
cmd.env(k, v);
}
cmd.output().expect("run cargo run --bin run_dot")
}
#[test]
fn run_dot_prints_usage_without_args() {
let out = run_run_dot(&[]);
assert!(!out.status.success());
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(stderr.contains("Usage") || stderr.contains("usage"));
assert!(stderr.contains("run_dot") || stderr.contains(".dot"));
}
#[test]
fn run_dot_exits_1_for_missing_file() {
let out = run_run_dot(&["/nonexistent/path.dot"]);
assert!(!out.status.success());
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("Error") || stderr.contains("error") || stderr.contains("reading"),
"stderr: {}",
stderr
);
}
#[test]
fn run_dot_dry_run_valid_dot_exits_0() {
let dir = tempfile::tempdir().expect("temp dir");
let path = dir.path().join("minimal.dot");
std::fs::write(
&path,
r#"digraph G {
graph [goal="test"]
start [shape=Mdiamond]
exit [shape=Msquare]
start -> exit
}"#,
)
.expect("write dot");
let path_str = path.to_str().expect("path");
let out = run_run_dot(&["--dry-run", path_str]);
assert!(
out.status.success(),
"stderr: {} stdout: {}",
String::from_utf8_lossy(&out.stderr),
String::from_utf8_lossy(&out.stdout)
);
let stdout = String::from_utf8_lossy(&out.stdout);
assert!(
stdout.contains("Dry-run: parse and validate OK."),
"stdout: {}",
stdout
);
}
#[test]
fn run_dot_dry_run_missing_file_exits_1() {
let out = run_run_dot(&["--dry-run", "/nonexistent/path.dot"]);
assert!(!out.status.success());
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("Error") || stderr.contains("error") || stderr.contains("reading"),
"stderr: {}",
stderr
);
}
#[test]
fn run_dot_dry_run_invalid_dot_exits_1() {
let dir = tempfile::tempdir().expect("temp dir");
let path = dir.path().join("invalid.dot");
std::fs::write(&path, "not a valid digraph").expect("write");
let path_str = path.to_str().expect("path");
let out = run_run_dot(&["--dry-run", path_str]);
assert!(!out.status.success());
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("Error")
|| stderr.contains("error")
|| stderr.contains("parsing")
|| stderr.contains("DOT"),
"stderr: {}",
stderr
);
}
#[test]
fn run_dot_succeeds_with_minimal_start_exit_dot() {
let dir = tempfile::tempdir().expect("temp dir");
let path = dir.path().join("minimal.dot");
std::fs::write(
&path,
r#"digraph G {
graph [goal="test"]
start [shape=Mdiamond]
exit [shape=Msquare]
start -> exit
}"#,
)
.expect("write dot");
let path_str = path.to_str().expect("path");
let out = run_run_dot(&[path_str]);
assert!(
out.status.success(),
"stderr: {} stdout: {}",
String::from_utf8_lossy(&out.stderr),
String::from_utf8_lossy(&out.stdout)
);
let stdout = String::from_utf8_lossy(&out.stdout);
assert!(stdout.contains("Pipeline completed"));
assert!(stdout.contains("Success") || stdout.contains("completed"));
}
#[test]
fn run_dot_prints_run_summary_after_pipeline() {
let dir = tempfile::tempdir().expect("temp dir");
let path = dir.path().join("minimal.dot");
std::fs::write(
&path,
r#"digraph G {
graph [goal="summary"]
start [shape=Mdiamond]
exit [shape=Msquare]
start -> exit
}"#,
)
.expect("write dot");
let path_str = path.to_str().expect("path");
let out = run_run_dot(&[path_str]);
assert!(
out.status.success(),
"stderr: {} stdout: {}",
String::from_utf8_lossy(&out.stderr),
String::from_utf8_lossy(&out.stdout)
);
let stdout = String::from_utf8_lossy(&out.stdout);
assert!(
stdout.contains("Nodes:"),
"stdout should contain run summary 'Nodes:': {}",
stdout
);
assert!(
stdout.contains("Success:"),
"stdout should contain run summary 'Success:': {}",
stdout
);
assert!(
stdout.contains("Failed:"),
"stdout should contain run summary 'Failed:': {}",
stdout
);
assert!(
stdout.contains("Duration:"),
"stdout should contain run summary 'Duration:': {}",
stdout
);
assert!(
stdout.contains("Nodes: 2") || stdout.contains("Nodes:2"),
"minimal graph has 2 completed nodes: {}",
stdout
);
assert!(
stdout.contains("Failed: 0") || stdout.contains("Failed:0"),
"minimal success path has 0 failed: {}",
stdout
);
}
#[test]
fn run_dot_json_outputs_valid_json_with_expected_fields() {
let dir = tempfile::tempdir().expect("temp dir");
let path = dir.path().join("minimal.dot");
std::fs::write(
&path,
r#"digraph G {
graph [goal="json-test"]
start [shape=Mdiamond]
exit [shape=Msquare]
start -> exit
}"#,
)
.expect("write dot");
let path_str = path.to_str().expect("path");
let out = run_run_dot(&["--json", path_str]);
assert!(
out.status.success(),
"run_dot --json should exit 0: stderr: {} stdout: {}",
String::from_utf8_lossy(&out.stderr),
String::from_utf8_lossy(&out.stdout)
);
let stdout = String::from_utf8_lossy(&out.stdout);
let json_line = stdout
.lines()
.find(|l| l.trim().starts_with('{') && l.contains("\"outcome\""))
.expect("stdout must contain a JSON object with outcome key");
let trimmed = json_line.trim();
let parsed: serde_json::Value = serde_json::from_str(trimmed).expect("stdout must be valid JSON");
let obj = parsed.as_object().expect("root must be a JSON object");
assert!(
obj.contains_key("outcome"),
"JSON must contain 'outcome': {}",
trimmed
);
assert_eq!(obj.get("outcome").and_then(|v| v.as_str()), Some("success"));
assert!(
obj.contains_key("nodes_run"),
"JSON must contain 'nodes_run': {}",
trimmed
);
assert!(
obj.contains_key("success_count"),
"JSON must contain 'success_count': {}",
trimmed
);
assert!(
obj.contains_key("failed_count"),
"JSON must contain 'failed_count': {}",
trimmed
);
assert!(
obj.contains_key("started_at"),
"JSON must contain 'started_at': {}",
trimmed
);
assert!(
obj.contains_key("finished_at"),
"JSON must contain 'finished_at': {}",
trimmed
);
assert_eq!(obj.get("nodes_run").and_then(|v| v.as_u64()), Some(2));
assert_eq!(obj.get("failed_count").and_then(|v| v.as_u64()), Some(0));
assert!(
!stdout.contains("Nodes:") && !stdout.contains("Pipeline completed"),
"with --json, human summary must be suppressed: {}",
stdout
);
}
#[test]
fn run_dot_execution_log_cli_writes_log_file() {
let dir = tempfile::tempdir().expect("temp dir");
let path = dir.path().join("minimal.dot");
let log_path = dir.path().join("execution.log.json");
std::fs::write(
&path,
r#"digraph G {
graph [goal="test"]
start [shape=Mdiamond]
exit [shape=Msquare]
start -> exit
}"#,
)
.expect("write dot");
let path_str = path.to_str().expect("path");
let log_path_str = log_path.to_str().expect("log path");
let out = run_run_dot(&["--execution-log", log_path_str, path_str]);
assert!(
out.status.success(),
"stderr: {} stdout: {}",
String::from_utf8_lossy(&out.stderr),
String::from_utf8_lossy(&out.stdout)
);
assert!(log_path.exists(), "execution log file should exist");
let content = std::fs::read_to_string(&log_path).expect("read execution log");
let log: serde_json::Value = serde_json::from_str(&content).expect("parse execution log JSON");
assert_eq!(log["version"], 1);
assert_eq!(log["goal"], "test");
assert_eq!(log["final_status"], "success");
}
#[test]
fn run_dot_resume_cli_completes_from_execution_log() {
let dir = tempfile::tempdir().expect("temp dir");
let run_dir = dir.path();
let log_path = run_dir.join("execution.log.json");
let path = run_dir.join("minimal.dot");
std::fs::write(
&path,
r#"digraph G {
graph [goal="resume-cli"]
start [shape=Mdiamond]
exit [shape=Msquare]
start -> exit
}"#,
)
.expect("write dot");
let path_str = path.to_str().expect("path");
let run_dir_str = run_dir.to_str().expect("run dir");
let log_path_str = log_path.to_str().expect("log path");
let out1 = run_run_dot(&[
"--run-dir",
run_dir_str,
"--execution-log",
log_path_str,
path_str,
]);
assert!(
out1.status.success(),
"first run should succeed: {}",
String::from_utf8_lossy(&out1.stderr)
);
assert!(log_path.exists(), "execution log should exist");
let out2 = run_run_dot(&["--resume", run_dir_str, path_str]);
assert!(
out2.status.success(),
"resume run should succeed: {}",
String::from_utf8_lossy(&out2.stderr)
);
let stdout = String::from_utf8_lossy(&out2.stdout);
assert!(
stdout.contains("Nothing to resume") || stdout.contains("Pipeline completed"),
"resume should report already completed or completed: {}",
stdout
);
}
#[test]
fn run_dot_summary_prints_outcome_step_count_duration() {
let dir = tempfile::tempdir().expect("temp dir");
let run_dir = dir.path();
let log_path = run_dir.join("execution.log.json");
let path = run_dir.join("minimal.dot");
std::fs::write(
&path,
r#"digraph G {
graph [goal="summary-test"]
start [shape=Mdiamond]
exit [shape=Msquare]
start -> exit
}"#,
)
.expect("write dot");
let path_str = path.to_str().expect("path");
let run_dir_str = run_dir.to_str().expect("run dir");
let log_path_str = log_path.to_str().expect("log path");
let out_run = run_run_dot(&[
"--run-dir",
run_dir_str,
"--execution-log",
log_path_str,
path_str,
]);
assert!(
out_run.status.success(),
"run should succeed: {}",
String::from_utf8_lossy(&out_run.stderr)
);
assert!(log_path.exists(), "execution log should exist");
let out_summary = run_run_dot(&["--summary", "--run-dir", run_dir_str, path_str]);
assert!(
out_summary.status.success(),
"summary should exit 0 for success run: {}",
String::from_utf8_lossy(&out_summary.stderr)
);
let stdout = String::from_utf8_lossy(&out_summary.stdout);
assert!(
stdout.contains("outcome: success"),
"stdout should contain outcome: success: {}",
stdout
);
assert!(
stdout.contains("steps:") && stdout.contains("duration:"),
"stdout should contain steps and duration: {}",
stdout
);
}
#[test]
fn run_dot_execution_log_cli_default_path_under_stage_dir() {
let dir = tempfile::tempdir().expect("temp dir");
let stage = dir.path().join("stage");
std::fs::create_dir_all(&stage).expect("create stage dir");
let path = dir.path().join("minimal.dot");
std::fs::write(
&path,
r#"digraph G {
graph [goal="test"]
start [shape=Mdiamond]
exit [shape=Msquare]
start -> exit
}"#,
)
.expect("write dot");
let path_str = path.to_str().expect("path");
let stage_str = stage.to_str().expect("stage");
let out = run_run_dot(&["--execution-log", "--stage-dir", stage_str, path_str]);
assert!(
out.status.success(),
"stderr: {} stdout: {}",
String::from_utf8_lossy(&out.stderr),
String::from_utf8_lossy(&out.stdout)
);
let default_log = stage.join("execution.log.json");
assert!(
default_log.exists(),
"execution log should be at <stage_dir>/execution.log.json"
);
let content = std::fs::read_to_string(&default_log).expect("read execution log");
let log: serde_json::Value = serde_json::from_str(&content).expect("parse execution log JSON");
assert_eq!(log["final_status"], "success");
}
#[test]
fn run_dot_execution_log_env_1_uses_default_path() {
let dir = tempfile::tempdir().expect("temp dir");
let stage = dir.path().join("stage");
std::fs::create_dir_all(&stage).expect("create stage dir");
let path = dir.path().join("minimal.dot");
std::fs::write(
&path,
r#"digraph G {
graph [goal="test"]
start [shape=Mdiamond]
exit [shape=Msquare]
start -> exit
}"#,
)
.expect("write dot");
let path_str = path.to_str().expect("path");
let stage_str = stage.to_str().expect("stage");
let out = run_run_dot_with_env(
&["--stage-dir", stage_str, path_str],
&[("ATTRACTOR_EXECUTION_LOG", "1")],
);
assert!(
out.status.success(),
"stderr: {} stdout: {}",
String::from_utf8_lossy(&out.stderr),
String::from_utf8_lossy(&out.stdout)
);
let default_log = stage.join("execution.log.json");
assert!(
default_log.exists(),
"ATTRACTOR_EXECUTION_LOG=1 should write to <stage_dir>/execution.log.json"
);
let content = std::fs::read_to_string(&default_log).expect("read execution log");
let log: serde_json::Value = serde_json::from_str(&content).expect("parse execution log JSON");
assert_eq!(log["final_status"], "success");
}
#[test]
fn run_dot_execution_log_env_true_uses_default_path() {
let dir = tempfile::tempdir().expect("temp dir");
let stage = dir.path().join("stage");
std::fs::create_dir_all(&stage).expect("create stage dir");
let path = dir.path().join("minimal.dot");
std::fs::write(
&path,
r#"digraph G {
graph [goal="test"]
start [shape=Mdiamond]
exit [shape=Msquare]
start -> exit
}"#,
)
.expect("write dot");
let path_str = path.to_str().expect("path");
let stage_str = stage.to_str().expect("stage");
let out = run_run_dot_with_env(
&["--stage-dir", stage_str, path_str],
&[("ATTRACTOR_EXECUTION_LOG", "true")],
);
assert!(
out.status.success(),
"stderr: {} stdout: {}",
String::from_utf8_lossy(&out.stderr),
String::from_utf8_lossy(&out.stdout)
);
let default_log = stage.join("execution.log.json");
assert!(
default_log.exists(),
"ATTRACTOR_EXECUTION_LOG=true should write to <stage_dir>/execution.log.json"
);
}
#[test]
fn run_dot_execution_log_env_path_uses_that_path() {
let dir = tempfile::tempdir().expect("temp dir");
let path = dir.path().join("minimal.dot");
let log_path = dir.path().join("env_log.json");
std::fs::write(
&path,
r#"digraph G {
graph [goal="test"]
start [shape=Mdiamond]
exit [shape=Msquare]
start -> exit
}"#,
)
.expect("write dot");
let path_str = path.to_str().expect("path");
let log_path_str = log_path.to_str().expect("log path");
let out = run_run_dot_with_env(&[path_str], &[("ATTRACTOR_EXECUTION_LOG", log_path_str)]);
assert!(
out.status.success(),
"stderr: {} stdout: {}",
String::from_utf8_lossy(&out.stderr),
String::from_utf8_lossy(&out.stdout)
);
assert!(
log_path.exists(),
"ATTRACTOR_EXECUTION_LOG=<path> should write to that path"
);
let content = std::fs::read_to_string(&log_path).expect("read execution log");
let log: serde_json::Value = serde_json::from_str(&content).expect("parse execution log JSON");
assert_eq!(log["final_status"], "success");
}
#[test]
fn run_dot_execution_log_cli_overrides_env() {
let dir = tempfile::tempdir().expect("temp dir");
let path = dir.path().join("minimal.dot");
let cli_log = dir.path().join("cli_log.json");
let env_log = dir.path().join("env_log.json");
std::fs::write(
&path,
r#"digraph G {
graph [goal="test"]
start [shape=Mdiamond]
exit [shape=Msquare]
start -> exit
}"#,
)
.expect("write dot");
let path_str = path.to_str().expect("path");
let cli_log_str = cli_log.to_str().expect("cli log path");
let env_log_str = env_log.to_str().expect("env log path");
let out = run_run_dot_with_env(
&["--execution-log", cli_log_str, path_str],
&[("ATTRACTOR_EXECUTION_LOG", env_log_str)],
);
assert!(
out.status.success(),
"stderr: {} stdout: {}",
String::from_utf8_lossy(&out.stderr),
String::from_utf8_lossy(&out.stdout)
);
assert!(cli_log.exists(), "CLI --execution-log path should be used");
assert!(
!env_log.exists(),
"env path should be ignored when --execution-log is set"
);
}
#[test]
fn run_dot_timeout_exits_124() {
let dir = tempfile::tempdir().expect("temp dir");
let path = dir.path().join("sleep.dot");
std::fs::write(
&path,
r#"digraph G {
graph [goal="timeout test"]
start [shape=Mdiamond]
exit [shape=Msquare]
slow [type=exec, command="sleep 10"]
start -> slow -> exit
}"#,
)
.expect("write dot");
let path_str = path.to_str().expect("path");
let out = run_run_dot(&["--timeout", "1", path_str]);
assert!(
!out.status.success(),
"run_dot with --timeout 1 and sleeping workflow should not succeed; stderr: {}",
String::from_utf8_lossy(&out.stderr)
);
let code = out.status.code();
assert_eq!(
code,
Some(124),
"expected exit code 124 on timeout; stderr: {}",
String::from_utf8_lossy(&out.stderr)
);
let stderr = String::from_utf8_lossy(&out.stderr);
assert!(
stderr.contains("timed out") || stderr.contains("timeout"),
"stderr should mention timeout; stderr: {}",
stderr
);
}
#[tokio::test]
async fn execution_log_path_writes_execution_log_json() {
let dir = tempfile::tempdir().expect("temp dir");
let log_path = dir.path().join("execution.log.json");
let dot = r#"
digraph G {
graph [goal="exec log test"]
start [shape=Mdiamond]
exit [shape=Msquare]
start -> exit
}
"#;
let ast = streamweave_attractor::dot_parser::parse_dot(dot).expect("parse dot");
let _ = streamweave_attractor::run_compiled_graph(
&ast,
streamweave_attractor::RunOptions {
run_dir: None,
resume_state: None,
resume_already_completed: false,
agent_cmd: None,
stage_dir: None,
execution_log_path: Some(log_path.clone()),
},
)
.await
.expect("run_compiled_graph");
let content = std::fs::read_to_string(&log_path).expect("read execution log");
let log: serde_json::Value = serde_json::from_str(&content).expect("parse execution log JSON");
assert_eq!(log["version"], 1);
assert_eq!(log["goal"], "exec log test");
assert_eq!(log["final_status"], "success");
assert!(log["steps"].as_array().is_some(), "steps may be empty");
assert!(
log["completed_nodes"]
.as_array()
.map(|a| !a.is_empty())
.unwrap_or(false),
"completed run should have completed_nodes"
);
}
#[tokio::test]
async fn pre_push_dot_via_run_compiled_graph() {
let dot = r#"
digraph PrePush {
graph [goal="test"]
rankdir=LR
start [shape=Mdiamond]
exit [shape=Msquare]
pre_push [type=exec, command="true"]
test_coverage [type=exec, command="true"]
fix_pre_push [label="Fix"]
fix_test_coverage [label="Fix"]
start -> pre_push
pre_push -> test_coverage [condition="outcome=success"]
pre_push -> fix_pre_push [condition="outcome=fail"]
test_coverage -> exit [condition="outcome=success"]
test_coverage -> fix_test_coverage [condition="outcome=fail"]
}
"#;
let ast = streamweave_attractor::dot_parser::parse_dot(dot).expect("parse dot");
let result = streamweave_attractor::run_compiled_graph(
&ast,
streamweave_attractor::RunOptions {
run_dir: None,
resume_state: None,
resume_already_completed: false,
agent_cmd: None,
stage_dir: None,
execution_log_path: None,
},
)
.await
.expect("run_compiled_graph");
assert!(
!result.context.is_empty() || result.last_outcome.notes.is_some(),
"expected context or outcome notes"
);
}
#[tokio::test]
async fn test_out_error_dot_error_path_then_fix_to_exit() {
let dot = r#"
digraph TestOutError {
graph [goal="test out/error ports"]
start [shape=Mdiamond]
exit [shape=Msquare]
ok [type=exec, command="true"]
fail_step [type=exec, command="false"]
fix [type=exec, command="true"]
start -> ok
ok -> fail_step
fail_step -> exit [condition="outcome=success"]
fail_step -> fix [condition="outcome=fail"]
fix -> exit
}
"#;
let ast = streamweave_attractor::dot_parser::parse_dot(dot).expect("parse dot");
let result = streamweave_attractor::run_compiled_graph(
&ast,
streamweave_attractor::RunOptions {
run_dir: None,
resume_state: None,
resume_already_completed: false,
agent_cmd: None,
stage_dir: None,
execution_log_path: None,
},
)
.await
.expect("run_compiled_graph");
assert!(
result.completed_nodes.contains(&"fix".to_string()),
"fix node should run (error port from fail_step → fix → exit); completed: {:?}",
result.completed_nodes
);
assert!(
result.completed_nodes.contains(&"fail_step".to_string()),
"fail_step should complete (then error path to fix); completed: {:?}",
result.completed_nodes
);
}
#[tokio::test]
async fn run_dir_writes_execution_log() {
let dot = r#"
digraph G {
graph [goal="resume-test"]
start [shape=Mdiamond]
exit [shape=Msquare]
start -> exit
}
"#;
let ast = streamweave_attractor::dot_parser::parse_dot(dot).expect("parse dot");
let run_dir = tempfile::tempdir().expect("temp dir");
let log_path = run_dir
.path()
.join(streamweave_attractor::execution_log_io::EXECUTION_LOG_FILENAME);
streamweave_attractor::run_compiled_graph(
&ast,
streamweave_attractor::RunOptions {
run_dir: Some(run_dir.path()),
resume_state: None,
resume_already_completed: false,
agent_cmd: None,
stage_dir: None,
execution_log_path: Some(log_path.clone()),
},
)
.await
.expect("run_compiled_graph");
assert!(
log_path.exists(),
"execution.log.json should exist after run with run_dir and execution_log_path"
);
let log =
streamweave_attractor::execution_log_io::load_execution_log(&log_path).expect("load log");
assert_eq!(log.version, 1);
assert_eq!(log.goal, "resume-test");
assert!(
log.finished_at.is_some(),
"completed run should have finished_at set"
);
assert!(
!log.completed_nodes.is_empty(),
"completed run should have completed_nodes"
);
}
#[tokio::test]
async fn resume_from_execution_log_completes() {
let dot = r#"
digraph G {
graph [goal="resume-test"]
start [shape=Mdiamond]
exit [shape=Msquare]
start -> exit
}
"#;
let ast = streamweave_attractor::dot_parser::parse_dot(dot).expect("parse dot");
let run_dir = tempfile::tempdir().expect("temp dir");
let log_path = run_dir
.path()
.join(streamweave_attractor::execution_log_io::EXECUTION_LOG_FILENAME);
streamweave_attractor::run_compiled_graph(
&ast,
streamweave_attractor::RunOptions {
run_dir: Some(run_dir.path()),
resume_state: None,
resume_already_completed: false,
agent_cmd: None,
stage_dir: None,
execution_log_path: Some(log_path.clone()),
},
)
.await
.expect("first run");
let log =
streamweave_attractor::execution_log_io::load_execution_log(&log_path).expect("load log");
let exit_id = ast.find_exit().map(|n| n.id.as_str());
let resume = streamweave_attractor::execution_log_io::resume_state_from_log(&log, exit_id)
.expect("resume state from log");
let result = streamweave_attractor::run_compiled_graph(
&ast,
streamweave_attractor::RunOptions {
run_dir: Some(run_dir.path()),
resume_state: Some(resume.resume_state),
resume_already_completed: resume.already_completed,
agent_cmd: None,
stage_dir: None,
execution_log_path: Some(log_path),
},
)
.await
.expect("resume run");
assert!(
result.completed_nodes.contains(&"exit".to_string()),
"resume should complete through exit; completed: {:?}",
result.completed_nodes
);
}
#[tokio::test]
async fn resume_from_partial_log() {
use std::collections::HashMap;
let dot = r#"
digraph G {
graph [goal="partial-resume"]
start [shape=Mdiamond]
exit [shape=Msquare]
start -> exit
}
"#;
let ast = streamweave_attractor::dot_parser::parse_dot(dot).expect("parse dot");
let run_dir = tempfile::tempdir().expect("temp dir");
let log_path = run_dir
.path()
.join(streamweave_attractor::execution_log_io::EXECUTION_LOG_FILENAME);
let mut ctx: HashMap<String, String> = HashMap::new();
ctx.insert("goal".to_string(), "partial-resume".to_string());
let step = streamweave_attractor::types::ExecutionStepEntry::new(
1,
"start",
Some("start".to_string()),
HashMap::new(),
streamweave_attractor::types::NodeOutcome::success("ok"),
ctx.clone(),
Some("exit".to_string()),
vec!["start".to_string()],
);
let partial_log = streamweave_attractor::types::ExecutionLog {
version: 1,
goal: "partial-resume".to_string(),
started_at: "2026-02-14T10:00:00Z".to_string(),
finished_at: None,
final_status: String::new(),
completed_nodes: vec!["start".to_string()],
steps: vec![step],
};
streamweave_attractor::execution_log_io::write_execution_log_partial(&log_path, &partial_log)
.expect("write partial log");
let log =
streamweave_attractor::execution_log_io::load_execution_log(&log_path).expect("load log");
let exit_id = ast.find_exit().map(|n| n.id.as_str());
let resume = streamweave_attractor::execution_log_io::resume_state_from_log(&log, exit_id)
.expect("resume state from partial log");
assert!(
!resume.already_completed,
"partial log should not be already_completed"
);
let result = streamweave_attractor::run_compiled_graph(
&ast,
streamweave_attractor::RunOptions {
run_dir: Some(run_dir.path()),
resume_state: Some(resume.resume_state),
resume_already_completed: resume.already_completed,
agent_cmd: None,
stage_dir: None,
execution_log_path: Some(log_path),
},
)
.await
.expect("resume from partial log");
assert!(
result.completed_nodes.contains(&"exit".to_string()),
"resume from partial log should complete through exit; completed: {:?}",
result.completed_nodes
);
}
#[tokio::test]
async fn resume_from_completed_log_returns_already_completed() {
let dot = r#"
digraph G {
graph [goal="already-done"]
start [shape=Mdiamond]
exit [shape=Msquare]
start -> exit
}
"#;
let ast = streamweave_attractor::dot_parser::parse_dot(dot).expect("parse dot");
let run_dir = tempfile::tempdir().expect("temp dir");
let log_path = run_dir
.path()
.join(streamweave_attractor::execution_log_io::EXECUTION_LOG_FILENAME);
streamweave_attractor::run_compiled_graph(
&ast,
streamweave_attractor::RunOptions {
run_dir: Some(run_dir.path()),
resume_state: None,
resume_already_completed: false,
agent_cmd: None,
stage_dir: None,
execution_log_path: Some(log_path.clone()),
},
)
.await
.expect("first run");
let log =
streamweave_attractor::execution_log_io::load_execution_log(&log_path).expect("load log");
let exit_id = ast.find_exit().map(|n| n.id.as_str());
let resume = streamweave_attractor::execution_log_io::resume_state_from_log(&log, exit_id)
.expect("resume state from completed log");
assert!(
resume.already_completed,
"completed log should have already_completed true"
);
let result = streamweave_attractor::run_compiled_graph(
&ast,
streamweave_attractor::RunOptions {
run_dir: Some(run_dir.path()),
resume_state: Some(resume.resume_state),
resume_already_completed: resume.already_completed,
agent_cmd: None,
stage_dir: None,
execution_log_path: Some(log_path),
},
)
.await
.expect("second run with completed execution log");
assert!(
result.already_completed,
"resume from completed log should return already_completed true"
);
}
const GRAPH_COMPLETION_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(3);
#[tokio::test]
async fn tdd_codergen_error_path_graph_completes_within_timeout() {
let dot = r#"
digraph TddCodergenError {
graph [goal="tdd codergen error path"]
start [shape=Mdiamond]
exit [shape=Msquare]
fail [type=exec, command="false"]
fix [label="Fix"]
start -> fail
fail -> fix [condition="outcome=fail"]
fix -> exit [condition="outcome=fail"]
}
"#;
let ast = streamweave_attractor::dot_parser::parse_dot(dot).expect("parse dot");
let result = tokio::time::timeout(
GRAPH_COMPLETION_TIMEOUT,
streamweave_attractor::run_compiled_graph(
&ast,
streamweave_attractor::RunOptions {
run_dir: None,
resume_state: None,
resume_already_completed: false,
agent_cmd: None,
stage_dir: None,
execution_log_path: None,
},
),
)
.await;
assert!(
result.is_ok(),
"graph must complete within {:?} (codergen error path). \
If this times out, the node that sent on the error port did not drop that sender.",
GRAPH_COMPLETION_TIMEOUT
);
let run_result = result.unwrap().expect("run_compiled_graph");
assert!(
run_result.completed_nodes.contains(&"fix".to_string()),
"fix (CodergenNode) should have run and completed; completed: {:?}",
run_result.completed_nodes
);
}
#[tokio::test]
async fn tdd_codergen_success_path_graph_completes_within_timeout() {
let stage_dir = tempfile::tempdir().expect("temp stage dir");
let dot = r#"
digraph TddCodergenSuccess {
graph [goal="tdd codergen success path"]
start [shape=Mdiamond]
exit [shape=Msquare]
fix [label="Fix"]
start -> fix
fix -> exit
}
"#;
let ast = streamweave_attractor::dot_parser::parse_dot(dot).expect("parse dot");
let result = tokio::time::timeout(
GRAPH_COMPLETION_TIMEOUT,
streamweave_attractor::run_compiled_graph(
&ast,
streamweave_attractor::RunOptions {
run_dir: None,
resume_state: None,
resume_already_completed: false,
agent_cmd: Some("true".to_string()),
stage_dir: Some(stage_dir.path().to_path_buf()),
execution_log_path: None,
},
),
)
.await;
assert!(
result.is_ok(),
"graph must complete within {:?} (codergen success path). \
If this times out, the node that sent on the out port did not drop that sender.",
GRAPH_COMPLETION_TIMEOUT
);
let run_result = result.unwrap().expect("run_compiled_graph");
assert!(
run_result.completed_nodes.contains(&"fix".to_string()),
"fix (CodergenNode) should have run and completed; completed: {:?}",
run_result.completed_nodes
);
}
#[tokio::test]
async fn codergen_json_response_success_path() {
let stage_dir = tempfile::tempdir().expect("temp stage dir");
let json_path = stage_dir.path().join("out.json");
std::fs::write(&json_path, r#"{"outcome":"success","context_updates":{}}"#).expect("write json");
let dot = r#"
digraph TddJsonSuccess {
graph [goal="codergen JSON response"]
start [shape=Mdiamond]
exit [shape=Msquare]
fix [label="Fix"]
start -> fix
fix -> exit
}
"#;
let ast = streamweave_attractor::dot_parser::parse_dot(dot).expect("parse dot");
let agent_cmd = format!("cat {}", json_path.display());
let result = tokio::time::timeout(
std::time::Duration::from_secs(3),
streamweave_attractor::run_compiled_graph(
&ast,
streamweave_attractor::RunOptions {
run_dir: None,
resume_state: None,
resume_already_completed: false,
agent_cmd: Some(agent_cmd),
stage_dir: Some(stage_dir.path().to_path_buf()),
execution_log_path: None,
},
),
)
.await;
assert!(result.is_ok(), "graph must complete");
let r = result.unwrap().expect("run_compiled_graph");
assert!(
matches!(
r.last_outcome.status,
streamweave_attractor::types::OutcomeStatus::Success
),
"expected success when agent outputs outcome:success JSON, got {:?}",
r.last_outcome.status
);
assert!(r.completed_nodes.contains(&"fix".to_string()));
}
#[tokio::test]
async fn codergen_json_response_partial_success_path() {
let stage_dir = tempfile::tempdir().expect("temp stage dir");
let json_path = stage_dir.path().join("out.json");
std::fs::write(
&json_path,
r#"{"outcome":"partial_success","context_updates":{"key":"value"}}"#,
)
.expect("write json");
let dot = r#"
digraph TddJsonPartialSuccess {
graph [goal="codergen JSON partial_success"]
start [shape=Mdiamond]
exit [shape=Msquare]
fix [label="Fix"]
start -> fix
fix -> exit
}
"#;
let ast = streamweave_attractor::dot_parser::parse_dot(dot).expect("parse dot");
let agent_cmd = format!("cat {}", json_path.display());
let result = tokio::time::timeout(
std::time::Duration::from_secs(3),
streamweave_attractor::run_compiled_graph(
&ast,
streamweave_attractor::RunOptions {
run_dir: None,
resume_state: None,
resume_already_completed: false,
agent_cmd: Some(agent_cmd),
stage_dir: Some(stage_dir.path().to_path_buf()),
execution_log_path: None,
},
),
)
.await;
assert!(result.is_ok(), "graph must complete");
let r = result.unwrap().expect("run_compiled_graph");
assert!(
matches!(
r.last_outcome.status,
streamweave_attractor::types::OutcomeStatus::Success
),
"expected success when agent outputs outcome:partial_success JSON, got {:?}",
r.last_outcome.status
);
assert!(r.completed_nodes.contains(&"fix".to_string()));
}
#[tokio::test]
async fn codergen_json_response_fail_path() {
let stage_dir = tempfile::tempdir().expect("temp stage dir");
let json_path = stage_dir.path().join("out.json");
std::fs::write(&json_path, r#"{"outcome":"fail","context_updates":{}}"#).expect("write json");
let dot = r#"
digraph TddJsonFail {
graph [goal="codergen JSON fail"]
start [shape=Mdiamond]
exit [shape=Msquare]
fix [label="Fix"]
start -> fix
fix -> exit [condition="outcome=success"]
fix -> exit [condition="outcome=fail"]
}
"#;
let ast = streamweave_attractor::dot_parser::parse_dot(dot).expect("parse dot");
let agent_cmd = format!("cat {}", json_path.display());
let result = tokio::time::timeout(
GRAPH_COMPLETION_TIMEOUT,
streamweave_attractor::run_compiled_graph(
&ast,
streamweave_attractor::RunOptions {
run_dir: None,
resume_state: None,
resume_already_completed: false,
agent_cmd: Some(agent_cmd),
stage_dir: Some(stage_dir.path().to_path_buf()),
execution_log_path: None,
},
),
)
.await;
assert!(result.is_ok(), "graph must complete");
let r = result.unwrap().expect("run_compiled_graph");
assert!(
matches!(
r.last_outcome.status,
streamweave_attractor::types::OutcomeStatus::Error
),
"expected error when agent outputs outcome:fail JSON, got {:?}",
r.last_outcome.status
);
assert!(r.completed_nodes.contains(&"fix".to_string()));
}
#[tokio::test]
async fn tdd_exec_error_path_graph_completes_within_timeout() {
let dot = r#"
digraph TddExecError {
graph [goal="tdd exec error path"]
start [shape=Mdiamond]
exit [shape=Msquare]
fail [type=exec, command="false"]
start -> fail
fail -> exit [condition="outcome=success"]
fail -> exit [condition="outcome=fail"]
}
"#;
let ast = streamweave_attractor::dot_parser::parse_dot(dot).expect("parse dot");
let result = tokio::time::timeout(
GRAPH_COMPLETION_TIMEOUT,
streamweave_attractor::run_compiled_graph(
&ast,
streamweave_attractor::RunOptions {
run_dir: None,
resume_state: None,
resume_already_completed: false,
agent_cmd: None,
stage_dir: None,
execution_log_path: None,
},
),
)
.await;
assert!(
result.is_ok(),
"graph must complete within {:?} (exec error path). \
If this times out, ExecNode did not drop the error port sender after send.",
GRAPH_COMPLETION_TIMEOUT
);
}
#[tokio::test]
async fn tdd_cyclic_exec_error_path_graph_completes_within_timeout() {
let dot = r#"
digraph TddCyclicExecError {
graph [goal="tdd cyclic exec error path"]
start [shape=Mdiamond]
exit [shape=Msquare]
check [type=exec, command="false", label="Check"]
middle [type=exec, command="true", label="Middle"]
loop_back [type=exec, command="true", label="LoopBack"]
start -> check
loop_back -> check
check -> exit [condition="outcome=fail"]
check -> middle [condition="outcome=success"]
middle -> loop_back
}
"#;
let ast = streamweave_attractor::dot_parser::parse_dot(dot).expect("parse dot");
let result = tokio::time::timeout(
GRAPH_COMPLETION_TIMEOUT,
streamweave_attractor::run_compiled_graph(
&ast,
streamweave_attractor::RunOptions {
run_dir: None,
resume_state: None,
resume_already_completed: false,
agent_cmd: None,
stage_dir: None,
execution_log_path: None,
},
),
)
.await;
assert!(
result.is_ok(),
"cyclic graph must complete within {:?} (exec error path). \
If this times out, ExecNode did not break after error send, causing MergeNode deadlock.",
GRAPH_COMPLETION_TIMEOUT
);
}
#[tokio::test]
async fn tdd_exec_success_path_graph_completes_within_timeout() {
let dot = r#"
digraph TddExecSuccess {
graph [goal="tdd exec success path"]
start [shape=Mdiamond]
exit [shape=Msquare]
ok [type=exec, command="true"]
start -> ok
ok -> exit
}
"#;
let ast = streamweave_attractor::dot_parser::parse_dot(dot).expect("parse dot");
let result = tokio::time::timeout(
GRAPH_COMPLETION_TIMEOUT,
streamweave_attractor::run_compiled_graph(
&ast,
streamweave_attractor::RunOptions {
run_dir: None,
resume_state: None,
resume_already_completed: false,
agent_cmd: None,
stage_dir: None,
execution_log_path: None,
},
),
)
.await;
assert!(
result.is_ok(),
"graph must complete within {:?} (exec success path). \
If this times out, ExecNode did not drop the out port sender after send.",
GRAPH_COMPLETION_TIMEOUT
);
}
#[tokio::test]
async fn tdd_cyclic_codergen_error_path_graph_completes_within_timeout() {
let dot = r#"
digraph TddCyclicCodergenError {
graph [goal="tdd cyclic codergen error path"]
start [shape=Mdiamond]
exit [shape=Msquare]
check [label="Check"]
middle [type=exec, command="true", label="Middle"]
loop_back [type=exec, command="true", label="LoopBack"]
start -> check
loop_back -> check
check -> exit [condition="outcome=fail"]
check -> middle [condition="outcome=success"]
middle -> loop_back
}
"#;
let ast = streamweave_attractor::dot_parser::parse_dot(dot).expect("parse dot");
let result = tokio::time::timeout(
GRAPH_COMPLETION_TIMEOUT,
streamweave_attractor::run_compiled_graph(
&ast,
streamweave_attractor::RunOptions {
run_dir: None,
resume_state: None,
resume_already_completed: false,
agent_cmd: None,
stage_dir: None,
execution_log_path: None,
},
),
)
.await;
assert!(
result.is_ok(),
"cyclic graph must complete within {:?} (codergen error path). If this times out, CodergenNode did not break after error send.",
GRAPH_COMPLETION_TIMEOUT
);
}