use std::sync::Arc;
use indexmap::IndexMap;
use super::graph_analysis::{self, CycleAnalysis};
use super::graph_builder::fnv_hash;
use crate::error::{GraphDiagnostics, GraphError, TerminalError};
use crate::exec::execution_engine::{ExecutionEngine, ExecutionSignal, ExecutorState, NextAction};
use crate::node::{BarrierNode, ConditionNode, FlowNode, LeafNode, NodeKind};
use crate::state::workflow_state::{MergeStrategy, WorkflowState};
use crate::state::{State, StateMerge};
pub trait StepCallback<'e>: Send {
fn on_step(&mut self, node_name: &str, step: usize, duration: std::time::Duration);
}
pub struct NoopStepCallback;
impl<'e> StepCallback<'e> for NoopStepCallback {
fn on_step(&mut self, _node_name: &str, _step: usize, _duration: std::time::Duration) {}
}
pub type EdgeCondition<S> = Arc<dyn Fn(&S) -> bool + Send + Sync>;
#[derive(Clone)]
pub struct Edge<S: WorkflowState = State> {
pub from: String,
pub to: String,
pub condition: Option<EdgeCondition<S>>,
pub analysis: Option<EdgeAnalysis>,
pub fallback: bool,
}
impl<S: WorkflowState> Edge<S> {
pub fn is_conditional(&self) -> bool {
self.condition.is_some() && !self.fallback
}
pub fn is_normal(&self) -> bool {
self.condition.is_none() && !self.fallback
}
}
impl<S: WorkflowState> std::fmt::Debug for Edge<S> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Edge")
.field("from", &self.from)
.field("to", &self.to)
.field("has_condition", &self.condition.is_some())
.field("analysis", &self.analysis)
.field("fallback", &self.fallback)
.finish()
}
}
#[derive(Debug, Clone)]
pub struct EdgeAnalysis {
pub max_visits: Option<usize>,
}
#[derive(Clone)]
pub struct Graph<S: WorkflowState = State, M: MergeStrategy<S> = StateMerge> {
pub(crate) name: String,
pub(crate) nodes: IndexMap<String, NodeKind<S, M>>,
pub(crate) edges: Vec<Edge<S>>,
pub(crate) start: String,
pub(crate) end: String,
pub(crate) canonical_hash: u64,
}
impl<S: WorkflowState, M: MergeStrategy<S>> Graph<S, M> {
pub fn name(&self) -> &str {
&self.name
}
pub fn node_names(&self) -> Vec<&str> {
self.nodes.keys().map(|s| s.as_str()).collect()
}
pub fn start_node(&self) -> &str {
&self.start
}
pub fn end_node(&self) -> &str {
&self.end
}
pub fn canonical_hash(&self) -> u64 {
self.canonical_hash
}
pub fn hash_u64(&self) -> u64 {
let mut s = String::new();
let mut names: Vec<&str> = self.nodes.keys().map(|k| k.as_str()).collect();
names.sort();
s.push_str(&names.join(","));
s.push('|');
let mut edge_strs: Vec<String> = self
.edges
.iter()
.map(|e| {
format!(
"{}->{}{:?}{}",
e.from,
e.to,
if e.condition.is_some() { "?" } else { "" },
if e.fallback { "!" } else { "" }
)
})
.collect();
edge_strs.sort();
s.push_str(&edge_strs.join(","));
fnv_hash(&s)
}
pub fn hash(&self) -> String {
format!("{:016x}", self.canonical_hash)
}
pub fn edges_from(&self, from: &str) -> Vec<&Edge<S>> {
self.edges.iter().filter(|e| e.from == from).collect()
}
pub fn find_edge(&self, from: &str, to: &str) -> Option<&Edge<S>> {
self.edges.iter().find(|e| e.from == from && e.to == to)
}
pub fn node_map(&self) -> &IndexMap<String, NodeKind<S, M>> {
&self.nodes
}
fn resolve_next(&self, current: &str, state: &S) -> Option<String> {
for edge in self.edges_from(current) {
if edge.is_conditional() && edge.condition.as_ref().is_some_and(|c| c(state)) {
return Some(edge.to.clone());
}
}
for edge in self.edges_from(current) {
if edge.is_normal() {
return Some(edge.to.clone());
}
}
for edge in self.edges_from(current) {
if edge.fallback {
return Some(edge.to.clone());
}
}
None
}
pub(crate) fn resolve_next_inline(
&self,
current: &str,
state: &S,
) -> Result<String, GraphError> {
if self.edges_from(current).is_empty() {
return Err(GraphError::Terminal(TerminalError::InvalidGraph(format!(
"node '{}' has no outgoing edges and is not the end node",
current
))));
}
self.resolve_next(current, state).ok_or_else(|| {
GraphError::Terminal(TerminalError::InvalidGraph(format!(
"node '{}' has no matching outgoing edge",
current
)))
})
}
pub fn find_fallback_edge(&self, from: &str) -> Option<String> {
self.edges
.iter()
.find(|e| e.from == from && e.fallback)
.map(|e| e.to.clone())
}
pub fn validate(&self) -> Result<(), TerminalError> {
if !self.nodes.contains_key(&self.start) {
return Err(TerminalError::InvalidGraph(format!(
"start node '{}' not found",
self.start
)));
}
if !self.nodes.contains_key(&self.end) {
return Err(TerminalError::InvalidGraph(format!(
"end node '{}' not found",
self.end
)));
}
for edge in &self.edges {
if !self.nodes.contains_key(&edge.from) {
return Err(TerminalError::InvalidGraph(format!(
"edge references non-existent source node '{}'",
edge.from
)));
}
if !self.nodes.contains_key(&edge.to) {
return Err(TerminalError::InvalidGraph(format!(
"edge references non-existent target node '{}'",
edge.to
)));
}
}
Ok(())
}
pub fn analyze(&self) -> GraphDiagnostics {
graph_analysis::analyze_graph(self)
}
pub fn analyze_cycles(&self) -> CycleAnalysis {
let cycles = graph_analysis::find_all_cycles(self);
let unprotected = graph_analysis::filter_unprotected_cycles(self, &cycles);
CycleAnalysis {
has_cycles: !cycles.is_empty(),
cycles,
unprotected_cycles: unprotected,
total_edges: self.edges.len(),
protected_edges: self
.edges
.iter()
.filter(|e| e.analysis.as_ref().is_some_and(|a| a.max_visits.is_some()))
.count(),
}
}
pub async fn run_inline<'cb>(
&self,
exec_ctx: &mut ExecutionEngine<'_, S>,
max_steps: usize,
step_cb: &mut dyn StepCallback<'cb>,
) -> Result<(), GraphError> {
let mut current = self.start_node().to_string();
let mut step: usize = 0;
loop {
step += 1;
if step > max_steps {
return Err(GraphError::Terminal(TerminalError::StepsExceeded {
limit: max_steps,
}));
}
let node = self.nodes.get(¤t).ok_or_else(|| {
GraphError::Terminal(TerminalError::NodeNotFound(current.clone()))
})?;
let node_start = std::time::Instant::now();
match node {
NodeKind::Task(n) => {
let mut ctx = exec_ctx.build_node_context();
n.execute(&mut ctx).await?;
}
NodeKind::Condition(n) => {
let mut ctx = exec_ctx.build_leaf_context();
<ConditionNode<S> as LeafNode<S>>::execute(n, &mut ctx).await?;
}
NodeKind::Barrier(n) => {
let mut ctx = exec_ctx.build_leaf_context();
<BarrierNode<S> as LeafNode<S>>::execute(n, &mut ctx).await?;
}
NodeKind::External(n) => {
let mut ctx = exec_ctx.build_node_context();
n.execute(&mut ctx).await?;
}
NodeKind::ExternalLeaf(n) => {
let mut ctx = exec_ctx.build_leaf_context();
n.execute(&mut ctx).await?;
}
NodeKind::Parallel(p) => {
p.execute(exec_ctx).await?;
}
NodeKind::Subgraph(spec) => {
let stream = exec_ctx.stream_sink();
let cancel = exec_ctx.cancel_token().clone();
spec.execute(exec_ctx.state_mut(), stream, cancel).await?;
}
}
exec_ctx.commit();
exec_ctx.emit_checkpoint(¤t, step);
step_cb.on_step(¤t, step, node_start.elapsed());
let (next_action, signal) = exec_ctx.take_control();
if let Some(ExecutionSignal::Pause {
barrier_id,
timeout,
}) = signal
{
let outcome = exec_ctx.wait_barrier(&barrier_id, timeout).await;
match outcome {
crate::node::barrier_sink::BarrierOutcome::Decision(
crate::event::BarrierDecision::Reroute { target },
) => {
current = target;
continue;
}
crate::node::barrier_sink::BarrierOutcome::Decision(
crate::event::BarrierDecision::Approve
| crate::event::BarrierDecision::Reject { .. }
| crate::event::BarrierDecision::Modify { .. },
) => {
}
crate::node::barrier_sink::BarrierOutcome::TimedOut => {
}
crate::node::barrier_sink::BarrierOutcome::Cancelled => {
return Err(GraphError::Terminal(
crate::error::TerminalError::BarrierCancelled { node: current },
));
}
}
}
match next_action {
NextAction::End => return Ok(()),
NextAction::Goto(target) => {
current = target;
}
NextAction::Next => {
if current == self.end_node() {
return Ok(());
}
current = self.resolve_next_inline(¤t, exec_ctx.state())?;
}
}
}
}
}