use clap::Parser;
use std::env;
use std::fs;
use std::path::PathBuf;
use std::process;
use std::time::Duration;
use streamweave_attractor::{
AttractorResult, DEFAULT_STAGE_DIR, RunOptions, RunSummaryOutput, dot_parser, execution_log_io,
run_compiled_graph, run_summary_output_from_log, validate_attractor_graph,
};
use tracing::info;
use tracing_subscriber::{EnvFilter, fmt::format::FmtSpan};
#[derive(Parser, Debug)]
#[command(name = "run_dot")]
#[command(
after_help = r#"Environment variables (override --agent-cmd and --stage-dir when set):
ATTRACTOR_AGENT_CMD Command for agent/codergen nodes (e.g. cursor-agent). When set, agent steps
run this with prompt as stdin; agent runs with stage dir.
ATTRACTOR_STAGE_DIR Directory for staging (default: .attractor).
ATTRACTOR_EXECUTION_LOG Unset=off. 1 or true=write execution log to <stage_dir>/execution.log.json.
Any other value=path to execution log file. Overridden by --execution-log.
Run state (execution log only):
--run-dir DIR Run directory; with --execution-log, log is written to DIR/execution.log.json.
--resume DIR Resume from DIR/execution.log.json (same .dot file).
--summary Print summary of last run from execution log (outcome, step count, duration). Use with optional --run-dir.
--timeout SECS Maximum run time in seconds; on timeout exit with 124.
After a run, a one-line summary is printed: Nodes: N, Success: S, Failed: F, Duration: D.
Examples:
run_dot examples/workflows/pre-push.dot
run_dot --run-dir .attractor_run examples/workflows/pre-push.dot
run_dot --resume .attractor_run examples/workflows/pre-push.dot
run_dot --stage-dir /tmp/stage examples/workflows/pre-push.dot
run_dot --execution-log /tmp/execution.log.json examples/workflows/pre-push.dot
run_dot --timeout 300 examples/workflows/pre-push.dot
"#
)]
struct Args {
#[arg(long, value_name = "CMD")]
agent_cmd: Option<String>,
#[arg(long, value_name = "DIR", default_value = DEFAULT_STAGE_DIR)]
stage_dir: PathBuf,
#[arg(long = "run-dir", value_name = "DIR")]
run_dir: Option<PathBuf>,
#[arg(long = "resume", value_name = "DIR")]
resume: Option<PathBuf>,
#[arg(long = "execution-log", value_name = "PATH", num_args = 0..=1)]
execution_log: Option<Option<PathBuf>>,
#[arg(long = "dry-run", default_value_t = false)]
dry_run: bool,
#[arg(long, value_name = "SECS")]
timeout: Option<u64>,
#[arg(long = "summary", default_value_t = false)]
summary: bool,
#[arg(long = "json", default_value_t = false)]
json: bool,
#[arg(value_name = "path-to-dot-file")]
dot_path: PathBuf,
}
#[tokio::main]
async fn main() {
tracing::trace!("run_dot main entered");
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
.with_span_events(FmtSpan::ENTER | FmtSpan::EXIT)
.init();
info!("run_dot starting");
let args = Args::parse();
let agent_cmd = env::var("ATTRACTOR_AGENT_CMD")
.ok()
.or_else(|| args.agent_cmd.clone());
let stage_dir = env::var("ATTRACTOR_STAGE_DIR")
.ok()
.map(PathBuf::from)
.or_else(|| Some(args.stage_dir.clone()))
.unwrap_or_else(|| PathBuf::from(DEFAULT_STAGE_DIR));
let run_dir = args
.run_dir
.clone()
.unwrap_or_else(|| PathBuf::from(DEFAULT_STAGE_DIR));
let run_dir_for_options = Some(run_dir.as_path());
let default_execution_log_path = stage_dir.join("execution.log.json");
let execution_log_from_env = env::var("ATTRACTOR_EXECUTION_LOG").ok().map(|v| {
let v = v.trim();
if v == "1" || v.eq_ignore_ascii_case("true") {
default_execution_log_path.clone()
} else {
PathBuf::from(v)
}
});
let execution_log_path = args
.execution_log
.map(|opt| opt.unwrap_or(default_execution_log_path))
.or(execution_log_from_env)
.or_else(|| {
args
.resume
.as_ref()
.map(|d| d.join(execution_log_io::EXECUTION_LOG_FILENAME))
});
info!(agent_cmd = ?agent_cmd, stage_dir = %stage_dir.display(), run_dir = %run_dir.display(), resume = args.resume.is_some(), execution_log_path = ?execution_log_path, "options (env or flags)");
if args.summary {
let log_path = run_dir.join(execution_log_io::EXECUTION_LOG_FILENAME);
let log = match execution_log_io::load_execution_log(&log_path) {
Ok(l) => l,
Err(e) => {
eprintln!(
"Error loading execution log from {}: {}",
log_path.display(),
e
);
process::exit(1);
}
};
let outcome = &log.final_status;
if args.json {
let out = run_summary_output_from_log(&log);
println!(
"{}",
serde_json::to_string(&out).expect("RunSummaryOutput serialization")
);
process::exit(if outcome == "success" { 0 } else { 1 });
}
let step_count = log.steps.len();
let duration_msg = match (log.started_at.as_str(), log.finished_at.as_deref()) {
(start, Some(finish)) => {
match (
chrono::DateTime::parse_from_rfc3339(start),
chrono::DateTime::parse_from_rfc3339(finish),
) {
(Ok(t0), Ok(t1)) => {
let d = t1.signed_duration_since(t0);
format!("{}s", d.num_seconds())
}
_ => "unknown".to_string(),
}
}
_ => "incomplete (no finished_at)".to_string(),
};
println!("outcome: {}", outcome);
println!("steps: {}", step_count);
println!("duration: {}", duration_msg);
process::exit(if outcome == "success" { 0 } else { 1 });
}
let path = &args.dot_path;
let dot = match fs::read_to_string(path) {
Ok(s) => s,
Err(e) => {
eprintln!("Error reading {}: {}", path.display(), e);
process::exit(1);
}
};
let ast = match dot_parser::parse_dot(&dot) {
Ok(a) => a,
Err(e) => {
eprintln!("Error parsing DOT: {}", e);
process::exit(1);
}
};
if args.dry_run {
match validate_attractor_graph(&ast) {
Ok(()) => {
println!("Dry-run: parse and validate OK.");
process::exit(0);
}
Err(e) => {
eprintln!("Dry-run validation failed: {}", e);
process::exit(1);
}
}
}
let (resume_state, resume_already_completed) =
args.resume.as_ref().map_or((None, false), |dir| {
let log_path = dir.join(execution_log_io::EXECUTION_LOG_FILENAME);
if !log_path.exists() {
eprintln!(
"Error: no execution log at {}. Resume requires {} (execution log is the only run state).",
log_path.display(),
execution_log_io::EXECUTION_LOG_FILENAME
);
process::exit(1);
}
let log = execution_log_io::load_execution_log(&log_path).unwrap_or_else(|e| {
eprintln!("Error loading execution log from {}: {}", log_path.display(), e);
process::exit(1);
});
let exit_id = ast.find_exit().map(|n| n.id.as_str());
match execution_log_io::resume_state_from_log(&log, exit_id) {
Some(r) => (Some(r.resume_state), r.already_completed),
None => {
eprintln!(
"Error: execution log at {} has no steps and no finished_at; cannot resume.",
log_path.display()
);
process::exit(1);
}
}
});
let options = RunOptions {
run_dir: run_dir_for_options,
resume_state,
resume_already_completed,
agent_cmd,
stage_dir: Some(stage_dir),
execution_log_path,
};
let run_future = run_compiled_graph(&ast, options);
let run_result: Result<AttractorResult, String> = match args.timeout {
Some(secs) => {
let duration = Duration::from_secs(secs);
match tokio::time::timeout(duration, run_future).await {
Ok(Ok(res)) => Ok(res),
Ok(Err(e)) => {
eprintln!("Pipeline error: {}", e);
process::exit(1);
}
Err(_) => {
eprintln!("Pipeline run timed out after {} seconds.", secs);
process::exit(124);
}
}
}
None => match run_future.await {
Ok(res) => Ok(res),
Err(e) => {
eprintln!("Pipeline error: {}", e);
process::exit(1);
}
},
};
let r = run_result.expect("pipeline error and timeout paths exit above");
info!(status = ?r.last_outcome.status, nodes = ?r.completed_nodes, "pipeline completed");
if args.json {
let out: RunSummaryOutput = if let Some(ref s) = r.run_summary {
s.into()
} else if r.already_completed {
let log_path = run_dir.join(execution_log_io::EXECUTION_LOG_FILENAME);
match execution_log_io::load_execution_log(&log_path) {
Ok(log) => run_summary_output_from_log(&log),
Err(_) => RunSummaryOutput {
outcome: if format!("{:?}", r.last_outcome.status) == "Success" {
"success"
} else {
"fail"
}
.to_string(),
nodes_run: r.completed_nodes.len(),
success_count: r.completed_nodes.len(),
failed_count: 0,
started_at: String::new(),
finished_at: String::new(),
duration_secs: None,
},
}
} else {
RunSummaryOutput {
outcome: if format!("{:?}", r.last_outcome.status) == "Success" {
"success"
} else {
"fail"
}
.to_string(),
nodes_run: r.completed_nodes.len(),
success_count: r.completed_nodes.len(),
failed_count: 0,
started_at: String::new(),
finished_at: String::new(),
duration_secs: None,
}
};
println!(
"{}",
serde_json::to_string(&out).expect("RunSummaryOutput serialization")
);
process::exit(if out.outcome == "success" { 0 } else { 1 });
}
if r.already_completed {
println!("Pipeline already completed (execution log). Nothing to resume.");
} else {
println!("Pipeline completed.");
}
if let Some(ref s) = r.run_summary {
let duration = s
.duration_secs()
.map(|secs| format!("{}s", secs))
.unwrap_or_else(|| "?".to_string());
println!(
" Nodes: {}, Success: {}, Failed: {}, Duration: {}",
s.nodes_run, s.success_count, s.failed_count, duration
);
}
println!(" Status: {:?}", r.last_outcome.status);
println!(" Notes: {:?}", r.last_outcome.notes);
println!(" Completed nodes: {:?}", r.completed_nodes);
if format!("{:?}", r.last_outcome.status) != "Success" {
process::exit(1);
}
}