1use crate::execution_log_io::{load_execution_log, resume_state_from_log};
7use crate::types::{
8 AttractorGraph, AttractorResult, ExecutionLog, GraphPayload, NodeOutcome, OutcomeStatus,
9 ResumeState, RunSummary,
10};
11use std::path::Path;
12use std::sync::Arc;
13use tracing::instrument;
14#[instrument(level = "trace", skip(graph, initial))]
20pub async fn run_streamweave_graph(
21 mut graph: streamweave::graph::Graph,
22 initial: GraphPayload,
23) -> Result<Option<Arc<dyn std::any::Any + Send + Sync>>, String> {
24 let (tx_in, rx_in) = tokio::sync::mpsc::channel(1);
25 let (_tx_out, mut rx_out) = tokio::sync::mpsc::channel(16);
26
27 graph
28 .connect_input_channel("input", rx_in)
29 .map_err(|e| e.to_string())?;
30 graph
31 .connect_output_channel("output", _tx_out)
32 .map_err(|e| e.to_string())?;
33
34 tx_in
35 .send(Arc::new(initial) as Arc<dyn std::any::Any + Send + Sync>)
36 .await
37 .map_err(|e| e.to_string())?;
38 drop(tx_in);
39
40 tracing::trace!("run_streamweave_graph: calling graph.execute()");
41 graph.execute().await.map_err(|e| e.to_string())?;
42 tracing::trace!("run_streamweave_graph: execute done, waiting for output on rx_out.recv()");
43 let first = rx_out.recv().await;
44 tracing::trace!("run_streamweave_graph: received output, calling wait_for_completion()");
45 graph
46 .wait_for_completion()
47 .await
48 .map_err(|e| e.to_string())?;
49 Ok(first)
50}
51
52pub struct RunOptions<'a> {
54 pub run_dir: Option<&'a Path>,
56 pub resume_state: Option<ResumeState>,
58 pub resume_already_completed: bool,
60 pub agent_cmd: Option<String>,
62 pub stage_dir: Option<std::path::PathBuf>,
64 pub execution_log_path: Option<std::path::PathBuf>,
66}
67
68fn write_execution_log(
70 path: &Path,
71 goal: &str,
72 started_at: &str,
73 final_status: &str,
74 completed_nodes: &[String],
75 steps: Vec<crate::types::ExecutionStepEntry>,
76) -> Result<(), String> {
77 let finished_at = chrono::Utc::now().to_rfc3339();
78 let log = ExecutionLog {
79 version: 1,
80 goal: goal.to_string(),
81 started_at: started_at.to_string(),
82 finished_at: Some(finished_at),
83 final_status: final_status.to_string(),
84 completed_nodes: completed_nodes.to_vec(),
85 steps,
86 };
87 let json = serde_json::to_string_pretty(&log).map_err(|e| e.to_string())?;
88 std::fs::write(path, json).map_err(|e| e.to_string())?;
89 Ok(())
90}
91
92#[instrument(level = "trace", skip(ast, options))]
96pub async fn run_compiled_graph(
97 ast: &AttractorGraph,
98 options: RunOptions<'_>,
99) -> Result<AttractorResult, String> {
100 if let Some(ref log_path) = options.execution_log_path
102 && let Ok(log) = load_execution_log(log_path)
103 && log.finished_at.is_some()
104 {
105 let exit_id = ast.find_exit().map(|n| n.id.as_str());
106 if let Some(from_log) = resume_state_from_log(&log, exit_id)
107 && from_log.already_completed
108 {
109 return Ok(AttractorResult {
110 last_outcome: NodeOutcome::success("Exit"),
111 completed_nodes: from_log.resume_state.completed_nodes.clone(),
112 context: from_log.resume_state.context.clone(),
113 already_completed: true,
114 run_summary: None,
115 });
116 }
117 }
118
119 if let Some(ref st) = options.resume_state {
121 let exit_id = ast
122 .find_exit()
123 .map(|n| n.id.clone())
124 .ok_or("missing exit node")?;
125 let at_exit = st.current_node_id == exit_id;
126 if options.resume_already_completed || at_exit {
127 let mut completed = st.completed_nodes.clone();
128 if !completed.contains(&exit_id) {
129 completed.push(exit_id);
130 }
131 return Ok(AttractorResult {
132 last_outcome: NodeOutcome::success("Exit"),
133 completed_nodes: completed,
134 context: st.context.clone(),
135 already_completed: true,
136 run_summary: None,
137 });
138 }
139 }
140
141 let stage_dir = options
142 .stage_dir
143 .as_deref()
144 .or_else(|| Some(std::path::Path::new(crate::DEFAULT_STAGE_DIR)));
145 let started_at = chrono::Utc::now().to_rfc3339();
146 let entry_node_id = options
147 .resume_state
148 .as_ref()
149 .map(|st| st.current_node_id.as_str());
150 let mut graph = crate::compiler::compile_attractor_graph(
151 ast,
152 entry_node_id,
153 options.agent_cmd.as_deref(),
154 stage_dir,
155 )?;
156 let initial = match &options.resume_state {
157 Some(st) => GraphPayload::from_resume_state(st),
158 None => {
159 let mut ctx = std::collections::HashMap::new();
160 ctx.insert("goal".to_string(), ast.goal.clone());
161 ctx.insert("graph.goal".to_string(), ast.goal.clone());
162 let start_id = ast
163 .find_start()
164 .map(|n| n.id.clone())
165 .ok_or("missing start node")?;
166 GraphPayload::initial(ctx, start_id)
167 }
168 };
169
170 let (tx_in, rx_in) = tokio::sync::mpsc::channel(1);
171 let (_tx_out, mut rx_out) = tokio::sync::mpsc::channel(16);
172 let (_tx_err, mut rx_err) = tokio::sync::mpsc::channel(16);
173
174 graph
175 .connect_input_channel("input", rx_in)
176 .map_err(|e| e.to_string())?;
177 graph
178 .connect_output_channel("output", _tx_out)
179 .map_err(|e| e.to_string())?;
180 let has_error_port = graph.connect_output_channel("error", _tx_err).is_ok();
181
182 tx_in
183 .send(Arc::new(initial) as Arc<dyn std::any::Any + Send + Sync>)
184 .await
185 .map_err(|e| e.to_string())?;
186 drop(tx_in);
187
188 tracing::trace!("run_streamweave_graph: calling graph.execute()");
189 graph.execute().await.map_err(|e| e.to_string())?;
190 tracing::trace!("run_streamweave_graph: execute done, waiting for first of output or error");
191 let first = if has_error_port {
192 tokio::select! {
193 Some(arc) = rx_out.recv() => Some(arc),
194 Some(arc) = rx_err.recv() => Some(arc),
195 else => None,
196 }
197 } else {
198 rx_out.recv().await
199 };
200 let payload = first
203 .and_then(|arc| arc.downcast::<GraphPayload>().ok())
204 .map(|p| (*p).clone());
205 let (context, last_outcome, completed_nodes, _current_node_id) = payload
206 .as_ref()
207 .map(|p| {
208 (
209 p.context.clone(),
210 p.outcome
211 .clone()
212 .unwrap_or_else(|| NodeOutcome::success("Exit")),
213 p.completed_nodes.clone(),
214 p.current_node_id.clone(),
215 )
216 })
217 .unwrap_or_else(|| {
218 (
219 std::collections::HashMap::new(),
220 NodeOutcome::success("Exit"),
221 vec![],
222 String::new(),
223 )
224 });
225
226 let finished_at = chrono::Utc::now().to_rfc3339();
227 let (success_count, failed_count) = match last_outcome.status {
228 OutcomeStatus::Success | OutcomeStatus::PartialSuccess | OutcomeStatus::Retry => {
229 (completed_nodes.len(), 0)
230 }
231 OutcomeStatus::Error => (completed_nodes.len().saturating_sub(1), 1),
232 };
233 let run_summary = RunSummary {
234 nodes_run: completed_nodes.len(),
235 success_count,
236 failed_count,
237 started_at: started_at.clone(),
238 finished_at: finished_at.clone(),
239 };
240 let result = AttractorResult {
241 last_outcome: last_outcome.clone(),
242 completed_nodes: completed_nodes.clone(),
243 context: context.clone(),
244 already_completed: false,
245 run_summary: Some(run_summary),
246 };
247 if let Some(ref log_path) = options.execution_log_path {
248 let status = if last_outcome.status == OutcomeStatus::Success
250 || last_outcome.status == OutcomeStatus::PartialSuccess
251 {
252 "success"
253 } else {
254 "error"
255 };
256 let _ = write_execution_log(
257 log_path,
258 &ast.goal,
259 &started_at,
260 status,
261 &completed_nodes,
262 vec![],
263 );
264 }
265 Ok(result)
266}