Skip to main content

streamweave_attractor/
runner.rs

1//! Compiled graph runner: compile AttractorGraph to StreamWeave graph and run it.
2//!
3//! - [run_streamweave_graph]: run a compiled graph (one trigger in, first output out).
4//! - [run_compiled_graph]: compile AST then run, return [AttractorResult].
5
6use crate::execution_log_io::{load_execution_log, resume_state_from_log};
7use crate::types::{
8  AttractorGraph, AttractorResult, ExecutionLog, GraphPayload, NodeOutcome, OutcomeStatus,
9  ResumeState, RunSummary,
10};
11use std::path::Path;
12use std::sync::Arc;
13use tracing::instrument;
14/// Runs a compiled StreamWeave graph: feeds one trigger into the "input" port,
15/// runs until the graph produces output on the "output" port, then returns the first output item.
16///
17/// The graph must have been built with `input` and `output` port names (as produced by
18/// [crate::compile_attractor_graph].
19#[instrument(level = "trace", skip(graph, initial))]
20pub async fn run_streamweave_graph(
21  mut graph: streamweave::graph::Graph,
22  initial: GraphPayload,
23) -> Result<Option<Arc<dyn std::any::Any + Send + Sync>>, String> {
24  let (tx_in, rx_in) = tokio::sync::mpsc::channel(1);
25  let (_tx_out, mut rx_out) = tokio::sync::mpsc::channel(16);
26
27  graph
28    .connect_input_channel("input", rx_in)
29    .map_err(|e| e.to_string())?;
30  graph
31    .connect_output_channel("output", _tx_out)
32    .map_err(|e| e.to_string())?;
33
34  tx_in
35    .send(Arc::new(initial) as Arc<dyn std::any::Any + Send + Sync>)
36    .await
37    .map_err(|e| e.to_string())?;
38  drop(tx_in);
39
40  tracing::trace!("run_streamweave_graph: calling graph.execute()");
41  graph.execute().await.map_err(|e| e.to_string())?;
42  tracing::trace!("run_streamweave_graph: execute done, waiting for output on rx_out.recv()");
43  let first = rx_out.recv().await;
44  tracing::trace!("run_streamweave_graph: received output, calling wait_for_completion()");
45  graph
46    .wait_for_completion()
47    .await
48    .map_err(|e| e.to_string())?;
49  Ok(first)
50}
51
52/// Options for [run_compiled_graph].
53pub struct RunOptions<'a> {
54  /// If set, used to derive execution log path when [Self::execution_log_path] is set (e.g. run_dir/execution.log.json).
55  pub run_dir: Option<&'a Path>,
56  /// If set, run resumes from this state (from execution log only, no checkpoint.json).
57  pub resume_state: Option<ResumeState>,
58  /// When true and [Self::resume_state] is set, skip running and return already_completed (e.g. from execution log with finished_at).
59  pub resume_already_completed: bool,
60  /// Command for agent/codergen nodes (e.g. cursor-agent). Required if the graph has codergen nodes.
61  pub agent_cmd: Option<String>,
62  /// Directory for agent staging.
63  pub stage_dir: Option<std::path::PathBuf>,
64  /// If set, execution log is written to this path after the run (success or failure).
65  pub execution_log_path: Option<std::path::PathBuf>,
66}
67
68/// Writes execution.log.json to the given path (on both success and failure).
69fn write_execution_log(
70  path: &Path,
71  goal: &str,
72  started_at: &str,
73  final_status: &str,
74  completed_nodes: &[String],
75  steps: Vec<crate::types::ExecutionStepEntry>,
76) -> Result<(), String> {
77  let finished_at = chrono::Utc::now().to_rfc3339();
78  let log = ExecutionLog {
79    version: 1,
80    goal: goal.to_string(),
81    started_at: started_at.to_string(),
82    finished_at: Some(finished_at),
83    final_status: final_status.to_string(),
84    completed_nodes: completed_nodes.to_vec(),
85    steps,
86  };
87  let json = serde_json::to_string_pretty(&log).map_err(|e| e.to_string())?;
88  std::fs::write(path, json).map_err(|e| e.to_string())?;
89  Ok(())
90}
91
92/// Compiles the Attractor graph to a StreamWeave graph, runs it, and returns an [AttractorResult].
93/// Uses [crate::compile_attractor_graph]. Initial context includes the graph goal.
94/// When [RunOptions::execution_log_path] is set, writes execution.log.json to that path after the run.
95#[instrument(level = "trace", skip(ast, options))]
96pub async fn run_compiled_graph(
97  ast: &AttractorGraph,
98  options: RunOptions<'_>,
99) -> Result<AttractorResult, String> {
100  // When execution_log_path is set and log already has finished_at, return already_completed (e.g. second run).
101  if let Some(ref log_path) = options.execution_log_path
102    && let Ok(log) = load_execution_log(log_path)
103    && log.finished_at.is_some()
104  {
105    let exit_id = ast.find_exit().map(|n| n.id.as_str());
106    if let Some(from_log) = resume_state_from_log(&log, exit_id)
107      && from_log.already_completed
108    {
109      return Ok(AttractorResult {
110        last_outcome: NodeOutcome::success("Exit"),
111        completed_nodes: from_log.resume_state.completed_nodes.clone(),
112        context: from_log.resume_state.context.clone(),
113        already_completed: true,
114        run_summary: None,
115      });
116    }
117  }
118
119  // Optional early return from explicit resume_state.
120  if let Some(ref st) = options.resume_state {
121    let exit_id = ast
122      .find_exit()
123      .map(|n| n.id.clone())
124      .ok_or("missing exit node")?;
125    let at_exit = st.current_node_id == exit_id;
126    if options.resume_already_completed || at_exit {
127      let mut completed = st.completed_nodes.clone();
128      if !completed.contains(&exit_id) {
129        completed.push(exit_id);
130      }
131      return Ok(AttractorResult {
132        last_outcome: NodeOutcome::success("Exit"),
133        completed_nodes: completed,
134        context: st.context.clone(),
135        already_completed: true,
136        run_summary: None,
137      });
138    }
139  }
140
141  let stage_dir = options
142    .stage_dir
143    .as_deref()
144    .or_else(|| Some(std::path::Path::new(crate::DEFAULT_STAGE_DIR)));
145  let started_at = chrono::Utc::now().to_rfc3339();
146  let entry_node_id = options
147    .resume_state
148    .as_ref()
149    .map(|st| st.current_node_id.as_str());
150  let mut graph = crate::compiler::compile_attractor_graph(
151    ast,
152    entry_node_id,
153    options.agent_cmd.as_deref(),
154    stage_dir,
155  )?;
156  let initial = match &options.resume_state {
157    Some(st) => GraphPayload::from_resume_state(st),
158    None => {
159      let mut ctx = std::collections::HashMap::new();
160      ctx.insert("goal".to_string(), ast.goal.clone());
161      ctx.insert("graph.goal".to_string(), ast.goal.clone());
162      let start_id = ast
163        .find_start()
164        .map(|n| n.id.clone())
165        .ok_or("missing start node")?;
166      GraphPayload::initial(ctx, start_id)
167    }
168  };
169
170  let (tx_in, rx_in) = tokio::sync::mpsc::channel(1);
171  let (_tx_out, mut rx_out) = tokio::sync::mpsc::channel(16);
172  let (_tx_err, mut rx_err) = tokio::sync::mpsc::channel(16);
173
174  graph
175    .connect_input_channel("input", rx_in)
176    .map_err(|e| e.to_string())?;
177  graph
178    .connect_output_channel("output", _tx_out)
179    .map_err(|e| e.to_string())?;
180  let has_error_port = graph.connect_output_channel("error", _tx_err).is_ok();
181
182  tx_in
183    .send(Arc::new(initial) as Arc<dyn std::any::Any + Send + Sync>)
184    .await
185    .map_err(|e| e.to_string())?;
186  drop(tx_in);
187
188  tracing::trace!("run_streamweave_graph: calling graph.execute()");
189  graph.execute().await.map_err(|e| e.to_string())?;
190  tracing::trace!("run_streamweave_graph: execute done, waiting for first of output or error");
191  let first = if has_error_port {
192    tokio::select! {
193      Some(arc) = rx_out.recv() => Some(arc),
194      Some(arc) = rx_err.recv() => Some(arc),
195      else => None,
196    }
197  } else {
198    rx_out.recv().await
199  };
200  // Do not wait_for_completion(); first result decides outcome, avoids hang on merge graphs.
201
202  let payload = first
203    .and_then(|arc| arc.downcast::<GraphPayload>().ok())
204    .map(|p| (*p).clone());
205  let (context, last_outcome, completed_nodes, _current_node_id) = payload
206    .as_ref()
207    .map(|p| {
208      (
209        p.context.clone(),
210        p.outcome
211          .clone()
212          .unwrap_or_else(|| NodeOutcome::success("Exit")),
213        p.completed_nodes.clone(),
214        p.current_node_id.clone(),
215      )
216    })
217    .unwrap_or_else(|| {
218      (
219        std::collections::HashMap::new(),
220        NodeOutcome::success("Exit"),
221        vec![],
222        String::new(),
223      )
224    });
225
226  let finished_at = chrono::Utc::now().to_rfc3339();
227  let (success_count, failed_count) = match last_outcome.status {
228    OutcomeStatus::Success | OutcomeStatus::PartialSuccess | OutcomeStatus::Retry => {
229      (completed_nodes.len(), 0)
230    }
231    OutcomeStatus::Error => (completed_nodes.len().saturating_sub(1), 1),
232  };
233  let run_summary = RunSummary {
234    nodes_run: completed_nodes.len(),
235    success_count,
236    failed_count,
237    started_at: started_at.clone(),
238    finished_at: finished_at.clone(),
239  };
240  let result = AttractorResult {
241    last_outcome: last_outcome.clone(),
242    completed_nodes: completed_nodes.clone(),
243    context: context.clone(),
244    already_completed: false,
245    run_summary: Some(run_summary),
246  };
247  if let Some(ref log_path) = options.execution_log_path {
248    // use started_at from start of run
249    let status = if last_outcome.status == OutcomeStatus::Success
250      || last_outcome.status == OutcomeStatus::PartialSuccess
251    {
252      "success"
253    } else {
254      "error"
255    };
256    let _ = write_execution_log(
257      log_path,
258      &ast.goal,
259      &started_at,
260      status,
261      &completed_nodes,
262      vec![],
263    );
264  }
265  Ok(result)
266}