use crate::execution_log_io::{load_execution_log, resume_state_from_log};
use crate::types::{
AttractorGraph, AttractorResult, ExecutionLog, GraphPayload, NodeOutcome, OutcomeStatus,
ResumeState, RunSummary,
};
use std::path::Path;
use std::sync::Arc;
use tracing::instrument;
#[instrument(level = "trace", skip(graph, initial))]
pub async fn run_streamweave_graph(
mut graph: streamweave::graph::Graph,
initial: GraphPayload,
) -> Result<Option<Arc<dyn std::any::Any + Send + Sync>>, String> {
let (tx_in, rx_in) = tokio::sync::mpsc::channel(1);
let (_tx_out, mut rx_out) = tokio::sync::mpsc::channel(16);
graph
.connect_input_channel("input", rx_in)
.map_err(|e| e.to_string())?;
graph
.connect_output_channel("output", _tx_out)
.map_err(|e| e.to_string())?;
tx_in
.send(Arc::new(initial) as Arc<dyn std::any::Any + Send + Sync>)
.await
.map_err(|e| e.to_string())?;
drop(tx_in);
tracing::trace!("run_streamweave_graph: calling graph.execute()");
graph.execute().await.map_err(|e| e.to_string())?;
tracing::trace!("run_streamweave_graph: execute done, waiting for output on rx_out.recv()");
let first = rx_out.recv().await;
tracing::trace!("run_streamweave_graph: received output, calling wait_for_completion()");
graph
.wait_for_completion()
.await
.map_err(|e| e.to_string())?;
Ok(first)
}
pub struct RunOptions<'a> {
pub run_dir: Option<&'a Path>,
pub resume_state: Option<ResumeState>,
pub resume_already_completed: bool,
pub agent_cmd: Option<String>,
pub stage_dir: Option<std::path::PathBuf>,
pub execution_log_path: Option<std::path::PathBuf>,
}
fn write_execution_log(
path: &Path,
goal: &str,
started_at: &str,
final_status: &str,
completed_nodes: &[String],
steps: Vec<crate::types::ExecutionStepEntry>,
) -> Result<(), String> {
let finished_at = chrono::Utc::now().to_rfc3339();
let log = ExecutionLog {
version: 1,
goal: goal.to_string(),
started_at: started_at.to_string(),
finished_at: Some(finished_at),
final_status: final_status.to_string(),
completed_nodes: completed_nodes.to_vec(),
steps,
};
let json = serde_json::to_string_pretty(&log).map_err(|e| e.to_string())?;
std::fs::write(path, json).map_err(|e| e.to_string())?;
Ok(())
}
#[instrument(level = "trace", skip(ast, options))]
pub async fn run_compiled_graph(
ast: &AttractorGraph,
options: RunOptions<'_>,
) -> Result<AttractorResult, String> {
if let Some(ref log_path) = options.execution_log_path
&& let Ok(log) = load_execution_log(log_path)
&& log.finished_at.is_some()
{
let exit_id = ast.find_exit().map(|n| n.id.as_str());
if let Some(from_log) = resume_state_from_log(&log, exit_id)
&& from_log.already_completed
{
return Ok(AttractorResult {
last_outcome: NodeOutcome::success("Exit"),
completed_nodes: from_log.resume_state.completed_nodes.clone(),
context: from_log.resume_state.context.clone(),
already_completed: true,
run_summary: None,
});
}
}
if let Some(ref st) = options.resume_state {
let exit_id = ast
.find_exit()
.map(|n| n.id.clone())
.ok_or("missing exit node")?;
let at_exit = st.current_node_id == exit_id;
if options.resume_already_completed || at_exit {
let mut completed = st.completed_nodes.clone();
if !completed.contains(&exit_id) {
completed.push(exit_id);
}
return Ok(AttractorResult {
last_outcome: NodeOutcome::success("Exit"),
completed_nodes: completed,
context: st.context.clone(),
already_completed: true,
run_summary: None,
});
}
}
let stage_dir = options
.stage_dir
.as_deref()
.or_else(|| Some(std::path::Path::new(crate::DEFAULT_STAGE_DIR)));
let started_at = chrono::Utc::now().to_rfc3339();
let entry_node_id = options
.resume_state
.as_ref()
.map(|st| st.current_node_id.as_str());
let mut graph = crate::compiler::compile_attractor_graph(
ast,
entry_node_id,
options.agent_cmd.as_deref(),
stage_dir,
)?;
let initial = match &options.resume_state {
Some(st) => GraphPayload::from_resume_state(st),
None => {
let mut ctx = std::collections::HashMap::new();
ctx.insert("goal".to_string(), ast.goal.clone());
ctx.insert("graph.goal".to_string(), ast.goal.clone());
let start_id = ast
.find_start()
.map(|n| n.id.clone())
.ok_or("missing start node")?;
GraphPayload::initial(ctx, start_id)
}
};
let (tx_in, rx_in) = tokio::sync::mpsc::channel(1);
let (_tx_out, mut rx_out) = tokio::sync::mpsc::channel(16);
let (_tx_err, mut rx_err) = tokio::sync::mpsc::channel(16);
graph
.connect_input_channel("input", rx_in)
.map_err(|e| e.to_string())?;
graph
.connect_output_channel("output", _tx_out)
.map_err(|e| e.to_string())?;
let has_error_port = graph.connect_output_channel("error", _tx_err).is_ok();
tx_in
.send(Arc::new(initial) as Arc<dyn std::any::Any + Send + Sync>)
.await
.map_err(|e| e.to_string())?;
drop(tx_in);
tracing::trace!("run_streamweave_graph: calling graph.execute()");
graph.execute().await.map_err(|e| e.to_string())?;
tracing::trace!("run_streamweave_graph: execute done, waiting for first of output or error");
let first = if has_error_port {
tokio::select! {
Some(arc) = rx_out.recv() => Some(arc),
Some(arc) = rx_err.recv() => Some(arc),
else => None,
}
} else {
rx_out.recv().await
};
let payload = first
.and_then(|arc| arc.downcast::<GraphPayload>().ok())
.map(|p| (*p).clone());
let (context, last_outcome, completed_nodes, _current_node_id) = payload
.as_ref()
.map(|p| {
(
p.context.clone(),
p.outcome
.clone()
.unwrap_or_else(|| NodeOutcome::success("Exit")),
p.completed_nodes.clone(),
p.current_node_id.clone(),
)
})
.unwrap_or_else(|| {
(
std::collections::HashMap::new(),
NodeOutcome::success("Exit"),
vec![],
String::new(),
)
});
let finished_at = chrono::Utc::now().to_rfc3339();
let (success_count, failed_count) = match last_outcome.status {
OutcomeStatus::Success | OutcomeStatus::PartialSuccess | OutcomeStatus::Retry => {
(completed_nodes.len(), 0)
}
OutcomeStatus::Error => (completed_nodes.len().saturating_sub(1), 1),
};
let run_summary = RunSummary {
nodes_run: completed_nodes.len(),
success_count,
failed_count,
started_at: started_at.clone(),
finished_at: finished_at.clone(),
};
let result = AttractorResult {
last_outcome: last_outcome.clone(),
completed_nodes: completed_nodes.clone(),
context: context.clone(),
already_completed: false,
run_summary: Some(run_summary),
};
if let Some(ref log_path) = options.execution_log_path {
let status = if last_outcome.status == OutcomeStatus::Success
|| last_outcome.status == OutcomeStatus::PartialSuccess
{
"success"
} else {
"error"
};
let _ = write_execution_log(
log_path,
&ast.goal,
&started_at,
status,
&completed_nodes,
vec![],
);
}
Ok(result)
}