streamweave-attractor 0.3.0

Attractor pipeline as a StreamWeave graph
Documentation
//! CLI: Run an Attractor pipeline from a .dot file.
//!
//! Uses the compiled pipeline: parse DOT → validate → run (real exec + agent when ATTRACTOR_AGENT_CMD set).
//! Supports fix-and-retry cycles via the runner loop.
//!
//! Usage: `run_dot [OPTIONS] <path-to-dot-file>`
//! Example: run_dot examples/workflows/pre-push.dot
//!
//! With --run-dir DIR, execution log is written to DIR/execution.log.json when --execution-log is used.
//! With --resume DIR, run resumes from DIR/execution.log.json (same .dot file). Execution log is the only persisted run state.
//!
//! Set RUST_LOG=streamweave_attractor=trace for TRACE-level span enter/exit and events.

use clap::Parser;
use std::env;
use std::fs;
use std::path::PathBuf;
use std::process;
use std::time::Duration;
use streamweave_attractor::{
  AttractorResult, DEFAULT_STAGE_DIR, RunOptions, RunSummaryOutput, dot_parser, execution_log_io,
  run_compiled_graph, run_summary_output_from_log, validate_attractor_graph,
};
use tracing::info;
use tracing_subscriber::{EnvFilter, fmt::format::FmtSpan};

/// Run an Attractor pipeline from a .dot file.
///
/// Environment variables (see --help for ATTRACTOR_AGENT_CMD and ATTRACTOR_STAGE_DIR).
#[derive(Parser, Debug)]
#[command(name = "run_dot")]
#[command(
  after_help = r#"Environment variables (override --agent-cmd and --stage-dir when set):
  ATTRACTOR_AGENT_CMD   Command for agent/codergen nodes (e.g. cursor-agent). When set, agent steps
                        run this with prompt as stdin; agent runs with stage dir.
  ATTRACTOR_STAGE_DIR      Directory for staging (default: .attractor).
  ATTRACTOR_EXECUTION_LOG  Unset=off. 1 or true=write execution log to <stage_dir>/execution.log.json.
                           Any other value=path to execution log file. Overridden by --execution-log.

Run state (execution log only):
  --run-dir DIR   Run directory; with --execution-log, log is written to DIR/execution.log.json.
  --resume DIR    Resume from DIR/execution.log.json (same .dot file).
  --summary       Print summary of last run from execution log (outcome, step count, duration). Use with optional --run-dir.

  --timeout SECS  Maximum run time in seconds; on timeout exit with 124.

After a run, a one-line summary is printed: Nodes: N, Success: S, Failed: F, Duration: D.

Examples:
  run_dot examples/workflows/pre-push.dot
  run_dot --run-dir .attractor_run examples/workflows/pre-push.dot
  run_dot --resume .attractor_run examples/workflows/pre-push.dot
  run_dot --stage-dir /tmp/stage examples/workflows/pre-push.dot
  run_dot --execution-log /tmp/execution.log.json examples/workflows/pre-push.dot
  run_dot --timeout 300 examples/workflows/pre-push.dot
  "#
)]
struct Args {
  /// Command for agent/codergen nodes (e.g. cursor-agent). Overridden by ATTRACTOR_AGENT_CMD if set.
  #[arg(long, value_name = "CMD")]
  agent_cmd: Option<String>,

  /// Directory for staging. Overridden by ATTRACTOR_STAGE_DIR if set. Default: .attractor
  #[arg(long, value_name = "DIR", default_value = DEFAULT_STAGE_DIR)]
  stage_dir: PathBuf,

  /// Run directory; with --execution-log, execution log is written to DIR/execution.log.json. Enables resume with --resume DIR.
  #[arg(long = "run-dir", value_name = "DIR")]
  run_dir: Option<PathBuf>,

  /// Resume from DIR/execution.log.json. Use same .dot file as initial run.
  #[arg(long = "resume", value_name = "DIR")]
  resume: Option<PathBuf>,

  /// Write execution log to PATH (default: <stage_dir>/execution.log.json). Overrides ATTRACTOR_EXECUTION_LOG.
  #[arg(long = "execution-log", value_name = "PATH", num_args = 0..=1)]
  execution_log: Option<Option<PathBuf>>,

  /// Parse and validate the workflow file only; do not run the pipeline.
  #[arg(long = "dry-run", default_value_t = false)]
  dry_run: bool,

  /// Maximum run time in seconds. If exceeded, exit with 124.
  #[arg(long, value_name = "SECS")]
  timeout: Option<u64>,

  /// Print a short summary of the last run from an execution log (outcome, step count, duration). Uses --run-dir or default stage dir. Does not run the pipeline.
  #[arg(long = "summary", default_value_t = false)]
  summary: bool,

  /// Emit a single JSON object to stdout (outcome, nodes_run, success_count, failed_count, started_at, finished_at, duration_secs). Use with a run or with --summary. Suppresses human-readable summary.
  #[arg(long = "json", default_value_t = false)]
  json: bool,

  /// Path to the .dot workflow file
  #[arg(value_name = "path-to-dot-file")]
  dot_path: PathBuf,
}

#[tokio::main]
async fn main() {
  tracing::trace!("run_dot main entered");
  tracing_subscriber::fmt()
    .with_env_filter(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
    .with_span_events(FmtSpan::ENTER | FmtSpan::EXIT)
    .init();

  info!("run_dot starting");
  let args = Args::parse();

  // Env vars override flags. These are the values used by the program (not read from env again).
  let agent_cmd = env::var("ATTRACTOR_AGENT_CMD")
    .ok()
    .or_else(|| args.agent_cmd.clone());
  let stage_dir = env::var("ATTRACTOR_STAGE_DIR")
    .ok()
    .map(PathBuf::from)
    .or_else(|| Some(args.stage_dir.clone()))
    .unwrap_or_else(|| PathBuf::from(DEFAULT_STAGE_DIR));
  let run_dir = args
    .run_dir
    .clone()
    .unwrap_or_else(|| PathBuf::from(DEFAULT_STAGE_DIR));
  let run_dir_for_options = Some(run_dir.as_path());

  let default_execution_log_path = stage_dir.join("execution.log.json");
  let execution_log_from_env = env::var("ATTRACTOR_EXECUTION_LOG").ok().map(|v| {
    let v = v.trim();
    if v == "1" || v.eq_ignore_ascii_case("true") {
      default_execution_log_path.clone()
    } else {
      PathBuf::from(v)
    }
  });
  let execution_log_path = args
    .execution_log
    .map(|opt| opt.unwrap_or(default_execution_log_path))
    .or(execution_log_from_env)
    .or_else(|| {
      args
        .resume
        .as_ref()
        .map(|d| d.join(execution_log_io::EXECUTION_LOG_FILENAME))
    });

  info!(agent_cmd = ?agent_cmd, stage_dir = %stage_dir.display(), run_dir = %run_dir.display(), resume = args.resume.is_some(), execution_log_path = ?execution_log_path, "options (env or flags)");

  if args.summary {
    let log_path = run_dir.join(execution_log_io::EXECUTION_LOG_FILENAME);
    let log = match execution_log_io::load_execution_log(&log_path) {
      Ok(l) => l,
      Err(e) => {
        eprintln!(
          "Error loading execution log from {}: {}",
          log_path.display(),
          e
        );
        process::exit(1);
      }
    };
    let outcome = &log.final_status;
    if args.json {
      let out = run_summary_output_from_log(&log);
      println!(
        "{}",
        serde_json::to_string(&out).expect("RunSummaryOutput serialization")
      );
      process::exit(if outcome == "success" { 0 } else { 1 });
    }
    let step_count = log.steps.len();
    let duration_msg = match (log.started_at.as_str(), log.finished_at.as_deref()) {
      (start, Some(finish)) => {
        match (
          chrono::DateTime::parse_from_rfc3339(start),
          chrono::DateTime::parse_from_rfc3339(finish),
        ) {
          (Ok(t0), Ok(t1)) => {
            let d = t1.signed_duration_since(t0);
            format!("{}s", d.num_seconds())
          }
          _ => "unknown".to_string(),
        }
      }
      _ => "incomplete (no finished_at)".to_string(),
    };
    println!("outcome: {}", outcome);
    println!("steps: {}", step_count);
    println!("duration: {}", duration_msg);
    process::exit(if outcome == "success" { 0 } else { 1 });
  }

  let path = &args.dot_path;
  let dot = match fs::read_to_string(path) {
    Ok(s) => s,
    Err(e) => {
      eprintln!("Error reading {}: {}", path.display(), e);
      process::exit(1);
    }
  };

  let ast = match dot_parser::parse_dot(&dot) {
    Ok(a) => a,
    Err(e) => {
      eprintln!("Error parsing DOT: {}", e);
      process::exit(1);
    }
  };

  if args.dry_run {
    match validate_attractor_graph(&ast) {
      Ok(()) => {
        println!("Dry-run: parse and validate OK.");
        process::exit(0);
      }
      Err(e) => {
        eprintln!("Dry-run validation failed: {}", e);
        process::exit(1);
      }
    }
  }

  let (resume_state, resume_already_completed) =
    args.resume.as_ref().map_or((None, false), |dir| {
      let log_path = dir.join(execution_log_io::EXECUTION_LOG_FILENAME);
      if !log_path.exists() {
        eprintln!(
          "Error: no execution log at {}. Resume requires {} (execution log is the only run state).",
          log_path.display(),
          execution_log_io::EXECUTION_LOG_FILENAME
        );
        process::exit(1);
      }
      let log = execution_log_io::load_execution_log(&log_path).unwrap_or_else(|e| {
        eprintln!("Error loading execution log from {}: {}", log_path.display(), e);
        process::exit(1);
      });
      let exit_id = ast.find_exit().map(|n| n.id.as_str());
      match execution_log_io::resume_state_from_log(&log, exit_id) {
        Some(r) => (Some(r.resume_state), r.already_completed),
        None => {
          eprintln!(
            "Error: execution log at {} has no steps and no finished_at; cannot resume.",
            log_path.display()
          );
          process::exit(1);
        }
      }
    });

  let options = RunOptions {
    run_dir: run_dir_for_options,
    resume_state,
    resume_already_completed,
    agent_cmd,
    stage_dir: Some(stage_dir),
    execution_log_path,
  };

  let run_future = run_compiled_graph(&ast, options);
  let run_result: Result<AttractorResult, String> = match args.timeout {
    Some(secs) => {
      let duration = Duration::from_secs(secs);
      match tokio::time::timeout(duration, run_future).await {
        Ok(Ok(res)) => Ok(res),
        Ok(Err(e)) => {
          eprintln!("Pipeline error: {}", e);
          process::exit(1);
        }
        Err(_) => {
          eprintln!("Pipeline run timed out after {} seconds.", secs);
          process::exit(124);
        }
      }
    }
    None => match run_future.await {
      Ok(res) => Ok(res),
      Err(e) => {
        eprintln!("Pipeline error: {}", e);
        process::exit(1);
      }
    },
  };

  let r = run_result.expect("pipeline error and timeout paths exit above");

  info!(status = ?r.last_outcome.status, nodes = ?r.completed_nodes, "pipeline completed");

  if args.json {
    let out: RunSummaryOutput = if let Some(ref s) = r.run_summary {
      s.into()
    } else if r.already_completed {
      let log_path = run_dir.join(execution_log_io::EXECUTION_LOG_FILENAME);
      match execution_log_io::load_execution_log(&log_path) {
        Ok(log) => run_summary_output_from_log(&log),
        Err(_) => RunSummaryOutput {
          outcome: if format!("{:?}", r.last_outcome.status) == "Success" {
            "success"
          } else {
            "fail"
          }
          .to_string(),
          nodes_run: r.completed_nodes.len(),
          success_count: r.completed_nodes.len(),
          failed_count: 0,
          started_at: String::new(),
          finished_at: String::new(),
          duration_secs: None,
        },
      }
    } else {
      RunSummaryOutput {
        outcome: if format!("{:?}", r.last_outcome.status) == "Success" {
          "success"
        } else {
          "fail"
        }
        .to_string(),
        nodes_run: r.completed_nodes.len(),
        success_count: r.completed_nodes.len(),
        failed_count: 0,
        started_at: String::new(),
        finished_at: String::new(),
        duration_secs: None,
      }
    };
    println!(
      "{}",
      serde_json::to_string(&out).expect("RunSummaryOutput serialization")
    );
    process::exit(if out.outcome == "success" { 0 } else { 1 });
  }

  if r.already_completed {
    println!("Pipeline already completed (execution log). Nothing to resume.");
  } else {
    println!("Pipeline completed.");
  }
  if let Some(ref s) = r.run_summary {
    let duration = s
      .duration_secs()
      .map(|secs| format!("{}s", secs))
      .unwrap_or_else(|| "?".to_string());
    println!(
      "  Nodes: {}, Success: {}, Failed: {}, Duration: {}",
      s.nodes_run, s.success_count, s.failed_count, duration
    );
  }
  println!("  Status: {:?}", r.last_outcome.status);
  println!("  Notes: {:?}", r.last_outcome.notes);
  println!("  Completed nodes: {:?}", r.completed_nodes);
  if format!("{:?}", r.last_outcome.status) != "Success" {
    process::exit(1);
  }
}