1use std::sync::Arc;
13
14use indexmap::IndexMap;
15
16use super::graph_analysis::{self, CycleAnalysis};
17use super::graph_builder::fnv_hash;
18use crate::error::{GraphDiagnostics, GraphError, TerminalError};
19use crate::exec::execution_engine::{ExecutionEngine, ExecutionSignal, ExecutorState, NextAction};
20use crate::node::{BarrierNode, ConditionNode, FlowNode, LeafNode, NodeKind};
21use crate::state::workflow_state::{MergeStrategy, WorkflowState};
22use crate::state::{State, StateMerge};
23
24pub trait StepCallback<'e>: Send {
31 fn on_step(&mut self, node_name: &str, step: usize, duration: std::time::Duration);
37}
38
39pub struct NoopStepCallback;
41
42impl<'e> StepCallback<'e> for NoopStepCallback {
43 fn on_step(&mut self, _node_name: &str, _step: usize, _duration: std::time::Duration) {}
44}
45
46pub type EdgeCondition<S> = Arc<dyn Fn(&S) -> bool + Send + Sync>;
50
51#[derive(Clone)]
53pub struct Edge<S: WorkflowState = State> {
54 pub from: String,
55 pub to: String,
56 pub condition: Option<EdgeCondition<S>>,
58 pub analysis: Option<EdgeAnalysis>,
60 pub fallback: bool,
62}
63
64impl<S: WorkflowState> Edge<S> {
65 pub fn is_conditional(&self) -> bool {
67 self.condition.is_some() && !self.fallback
68 }
69
70 pub fn is_normal(&self) -> bool {
72 self.condition.is_none() && !self.fallback
73 }
74}
75
76impl<S: WorkflowState> std::fmt::Debug for Edge<S> {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 f.debug_struct("Edge")
79 .field("from", &self.from)
80 .field("to", &self.to)
81 .field("has_condition", &self.condition.is_some())
82 .field("analysis", &self.analysis)
83 .field("fallback", &self.fallback)
84 .finish()
85 }
86}
87
88#[derive(Debug, Clone)]
90pub struct EdgeAnalysis {
91 pub max_visits: Option<usize>,
93}
94
95#[derive(Clone)]
99pub struct Graph<S: WorkflowState = State, M: MergeStrategy<S> = StateMerge> {
100 pub(crate) name: String,
101 pub(crate) nodes: IndexMap<String, NodeKind<S, M>>,
102 pub(crate) edges: Vec<Edge<S>>,
103 pub(crate) start: String,
104 pub(crate) end: String,
105 pub(crate) canonical_hash: u64,
108}
109
110impl<S: WorkflowState, M: MergeStrategy<S>> Graph<S, M> {
111 pub fn name(&self) -> &str {
112 &self.name
113 }
114
115 pub fn node_names(&self) -> Vec<&str> {
116 self.nodes.keys().map(|s| s.as_str()).collect()
117 }
118
119 pub fn start_node(&self) -> &str {
120 &self.start
121 }
122
123 pub fn end_node(&self) -> &str {
124 &self.end
125 }
126
127 pub fn canonical_hash(&self) -> u64 {
132 self.canonical_hash
133 }
134
135 pub fn hash_u64(&self) -> u64 {
140 let mut s = String::new();
141 let mut names: Vec<&str> = self.nodes.keys().map(|k| k.as_str()).collect();
142 names.sort();
143 s.push_str(&names.join(","));
144 s.push('|');
145 let mut edge_strs: Vec<String> = self
146 .edges
147 .iter()
148 .map(|e| {
149 format!(
150 "{}->{}{:?}{}",
151 e.from,
152 e.to,
153 if e.condition.is_some() { "?" } else { "" },
154 if e.fallback { "!" } else { "" }
155 )
156 })
157 .collect();
158 edge_strs.sort();
159 s.push_str(&edge_strs.join(","));
160 fnv_hash(&s)
161 }
162
163 pub fn hash(&self) -> String {
165 format!("{:016x}", self.canonical_hash)
166 }
167
168 pub fn edges_from(&self, from: &str) -> Vec<&Edge<S>> {
169 self.edges.iter().filter(|e| e.from == from).collect()
170 }
171
172 pub fn find_edge(&self, from: &str, to: &str) -> Option<&Edge<S>> {
173 self.edges.iter().find(|e| e.from == from && e.to == to)
174 }
175
176 pub fn node_map(&self) -> &IndexMap<String, NodeKind<S, M>> {
178 &self.nodes
179 }
180
181 fn resolve_next(&self, current: &str, state: &S) -> Option<String> {
185 for edge in self.edges_from(current) {
187 if edge.is_conditional() && edge.condition.as_ref().is_some_and(|c| c(state)) {
188 return Some(edge.to.clone());
189 }
190 }
191
192 for edge in self.edges_from(current) {
194 if edge.is_normal() {
195 return Some(edge.to.clone());
196 }
197 }
198
199 for edge in self.edges_from(current) {
201 if edge.fallback {
202 return Some(edge.to.clone());
203 }
204 }
205
206 None
207 }
208
209 pub(crate) fn resolve_next_inline(
211 &self,
212 current: &str,
213 state: &S,
214 ) -> Result<String, GraphError> {
215 if self.edges_from(current).is_empty() {
216 return Err(GraphError::Terminal(TerminalError::InvalidGraph(format!(
217 "node '{}' has no outgoing edges and is not the end node",
218 current
219 ))));
220 }
221
222 self.resolve_next(current, state).ok_or_else(|| {
223 GraphError::Terminal(TerminalError::InvalidGraph(format!(
224 "node '{}' has no matching outgoing edge",
225 current
226 )))
227 })
228 }
229
230 pub fn find_fallback_edge(&self, from: &str) -> Option<String> {
231 self.edges
232 .iter()
233 .find(|e| e.from == from && e.fallback)
234 .map(|e| e.to.clone())
235 }
236
237 pub fn validate(&self) -> Result<(), TerminalError> {
239 if !self.nodes.contains_key(&self.start) {
240 return Err(TerminalError::InvalidGraph(format!(
241 "start node '{}' not found",
242 self.start
243 )));
244 }
245
246 if !self.nodes.contains_key(&self.end) {
247 return Err(TerminalError::InvalidGraph(format!(
248 "end node '{}' not found",
249 self.end
250 )));
251 }
252
253 for edge in &self.edges {
254 if !self.nodes.contains_key(&edge.from) {
255 return Err(TerminalError::InvalidGraph(format!(
256 "edge references non-existent source node '{}'",
257 edge.from
258 )));
259 }
260 if !self.nodes.contains_key(&edge.to) {
261 return Err(TerminalError::InvalidGraph(format!(
262 "edge references non-existent target node '{}'",
263 edge.to
264 )));
265 }
266 }
267
268 Ok(())
269 }
270
271 pub fn analyze(&self) -> GraphDiagnostics {
273 graph_analysis::analyze_graph(self)
274 }
275
276 pub fn analyze_cycles(&self) -> CycleAnalysis {
278 let cycles = graph_analysis::find_all_cycles(self);
279 let unprotected = graph_analysis::filter_unprotected_cycles(self, &cycles);
280
281 CycleAnalysis {
282 has_cycles: !cycles.is_empty(),
283 cycles,
284 unprotected_cycles: unprotected,
285 total_edges: self.edges.len(),
286 protected_edges: self
287 .edges
288 .iter()
289 .filter(|e| e.analysis.as_ref().is_some_and(|a| a.max_visits.is_some()))
290 .count(),
291 }
292 }
293
294 pub async fn run_inline<'cb>(
318 &self,
319 exec_ctx: &mut ExecutionEngine<'_, S>,
320 max_steps: usize,
321 step_cb: &mut dyn StepCallback<'cb>,
322 ) -> Result<(), GraphError> {
323 let mut current = self.start_node().to_string();
324 let mut step: usize = 0;
325
326 loop {
327 step += 1;
328 if step > max_steps {
329 return Err(GraphError::Terminal(TerminalError::StepsExceeded {
330 limit: max_steps,
331 }));
332 }
333
334 let node = self.nodes.get(¤t).ok_or_else(|| {
335 GraphError::Terminal(TerminalError::NodeNotFound(current.clone()))
336 })?;
337
338 let node_start = std::time::Instant::now();
339
340 match node {
342 NodeKind::Task(n) => {
343 let mut ctx = exec_ctx.build_node_context();
344 n.execute(&mut ctx).await?;
345 }
346 NodeKind::Condition(n) => {
347 let mut ctx = exec_ctx.build_leaf_context();
348 <ConditionNode<S> as LeafNode<S>>::execute(n, &mut ctx).await?;
349 }
350 NodeKind::Barrier(n) => {
351 let mut ctx = exec_ctx.build_leaf_context();
352 <BarrierNode<S> as LeafNode<S>>::execute(n, &mut ctx).await?;
353 }
354 NodeKind::External(n) => {
355 let mut ctx = exec_ctx.build_node_context();
356 n.execute(&mut ctx).await?;
357 }
358 NodeKind::ExternalLeaf(n) => {
359 let mut ctx = exec_ctx.build_leaf_context();
360 n.execute(&mut ctx).await?;
361 }
362 NodeKind::Parallel(p) => {
363 p.execute(exec_ctx).await?;
365 }
366 NodeKind::Subgraph(spec) => {
367 let stream = exec_ctx.stream_sink();
369 let cancel = exec_ctx.cancel_token().clone();
370 spec.execute(exec_ctx.state_mut(), stream, cancel).await?;
371 }
372 }
373
374 exec_ctx.commit();
376
377 exec_ctx.emit_checkpoint(¤t, step);
380
381 step_cb.on_step(¤t, step, node_start.elapsed());
383
384 let (next_action, signal) = exec_ctx.take_control();
386
387 if let Some(ExecutionSignal::Pause {
389 barrier_id,
390 timeout,
391 }) = signal
392 {
393 let outcome = exec_ctx.wait_barrier(&barrier_id, timeout).await;
394 match outcome {
395 crate::node::barrier_sink::BarrierOutcome::Decision(
396 crate::event::BarrierDecision::Reroute { target },
397 ) => {
398 current = target;
399 continue;
400 }
401 crate::node::barrier_sink::BarrierOutcome::Decision(
402 crate::event::BarrierDecision::Approve
403 | crate::event::BarrierDecision::Reject { .. }
404 | crate::event::BarrierDecision::Modify { .. },
405 ) => {
406 }
408 crate::node::barrier_sink::BarrierOutcome::TimedOut => {
409 }
411 crate::node::barrier_sink::BarrierOutcome::Cancelled => {
412 return Err(GraphError::Terminal(
413 crate::error::TerminalError::BarrierCancelled { node: current },
414 ));
415 }
416 }
417 }
418
419 match next_action {
421 NextAction::End => return Ok(()),
422 NextAction::Goto(target) => {
423 current = target;
424 }
425 NextAction::Next => {
426 if current == self.end_node() {
427 return Ok(());
428 }
429 current = self.resolve_next_inline(¤t, exec_ctx.state())?;
430 }
431 }
432 }
433 }
434}
435
436