1use std::sync::Arc;
13
14use indexmap::IndexMap;
15
16use crate::error::{BuildError, BuildErrors, GraphDiagnostics, GraphError, TerminalError};
17use crate::execution_engine::{ExecutionEngine, ExecutorState, NextAction};
18use crate::graph_analysis::{self, CycleAnalysis};
19use crate::node::{BarrierNode, ConditionNode, ExecutorOperation, FlowNode, LeafNode, NodeKind};
20use crate::state::{State, StateMerge};
21use crate::workflow_state::{MergeStrategy, WorkflowState};
22
23pub type EdgeCondition<S> = Arc<dyn Fn(&S) -> bool + Send + Sync>;
27
28#[derive(Clone)]
30pub struct Edge<S: WorkflowState = State> {
31 pub from: String,
32 pub to: String,
33 pub condition: Option<EdgeCondition<S>>,
35 pub analysis: Option<EdgeAnalysis>,
37 pub fallback: bool,
39}
40
41impl<S: WorkflowState> Edge<S> {
42 pub fn is_conditional(&self) -> bool {
44 self.condition.is_some() && !self.fallback
45 }
46
47 pub fn is_normal(&self) -> bool {
49 self.condition.is_none() && !self.fallback
50 }
51}
52
53impl<S: WorkflowState> std::fmt::Debug for Edge<S> {
54 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55 f.debug_struct("Edge")
56 .field("from", &self.from)
57 .field("to", &self.to)
58 .field("has_condition", &self.condition.is_some())
59 .field("analysis", &self.analysis)
60 .field("fallback", &self.fallback)
61 .finish()
62 }
63}
64
65#[derive(Debug, Clone)]
67pub struct EdgeAnalysis {
68 pub max_visits: Option<usize>,
70}
71
72#[derive(Clone)]
76pub struct Graph<S: WorkflowState = State, M: MergeStrategy<S> = StateMerge> {
77 pub(crate) name: String,
78 pub(crate) nodes: IndexMap<String, NodeKind<S, M>>,
79 pub(crate) edges: Vec<Edge<S>>,
80 pub(crate) start: String,
81 pub(crate) end: String,
82}
83
84impl<S: WorkflowState, M: MergeStrategy<S>> Graph<S, M> {
85 pub fn name(&self) -> &str {
86 &self.name
87 }
88
89 pub fn node_names(&self) -> Vec<&str> {
90 self.nodes.keys().map(|s| s.as_str()).collect()
91 }
92
93 pub fn start_node(&self) -> &str {
94 &self.start
95 }
96
97 pub fn end_node(&self) -> &str {
98 &self.end
99 }
100
101 pub fn hash_u64(&self) -> u64 {
105 let mut s = String::new();
106 let mut names: Vec<&str> = self.nodes.keys().map(|k| k.as_str()).collect();
107 names.sort();
108 s.push_str(&names.join(","));
109 s.push('|');
110 let mut edge_strs: Vec<String> = self
111 .edges
112 .iter()
113 .map(|e| {
114 format!(
115 "{}->{}{:?}{}",
116 e.from,
117 e.to,
118 if e.condition.is_some() { "?" } else { "" },
119 if e.fallback { "!" } else { "" }
120 )
121 })
122 .collect();
123 edge_strs.sort();
124 s.push_str(&edge_strs.join(","));
125 fnv_hash(&s)
126 }
127
128 pub fn hash(&self) -> String {
130 format!("{:016x}", self.hash_u64())
131 }
132
133 pub fn edges_from(&self, from: &str) -> Vec<&Edge<S>> {
134 self.edges.iter().filter(|e| e.from == from).collect()
135 }
136
137 pub fn find_edge(&self, from: &str, to: &str) -> Option<&Edge<S>> {
138 self.edges.iter().find(|e| e.from == from && e.to == to)
139 }
140
141 pub fn node_map(&self) -> &IndexMap<String, NodeKind<S, M>> {
143 &self.nodes
144 }
145
146 fn resolve_next(&self, current: &str, state: &S) -> Option<String> {
150 for edge in self.edges_from(current) {
152 if edge.is_conditional() && edge.condition.as_ref().is_some_and(|c| c(state)) {
153 return Some(edge.to.clone());
154 }
155 }
156
157 for edge in self.edges_from(current) {
159 if edge.is_normal() {
160 return Some(edge.to.clone());
161 }
162 }
163
164 for edge in self.edges_from(current) {
166 if edge.fallback {
167 return Some(edge.to.clone());
168 }
169 }
170
171 None
172 }
173
174 pub(crate) fn resolve_next_inline(
176 &self,
177 current: &str,
178 state: &S,
179 ) -> Result<String, GraphError> {
180 if self.edges_from(current).is_empty() {
181 return Err(GraphError::Terminal(TerminalError::InvalidGraph(format!(
182 "node '{}' has no outgoing edges and is not the end node",
183 current
184 ))));
185 }
186
187 self.resolve_next(current, state).ok_or_else(|| {
188 GraphError::Terminal(TerminalError::InvalidGraph(format!(
189 "node '{}' has no matching outgoing edge",
190 current
191 )))
192 })
193 }
194
195 pub fn find_fallback_edge(&self, from: &str) -> Option<String> {
196 self.edges
197 .iter()
198 .find(|e| e.from == from && e.fallback)
199 .map(|e| e.to.clone())
200 }
201
202 pub fn validate(&self) -> Result<(), TerminalError> {
204 if !self.nodes.contains_key(&self.start) {
205 return Err(TerminalError::InvalidGraph(format!(
206 "start node '{}' not found",
207 self.start
208 )));
209 }
210
211 if !self.nodes.contains_key(&self.end) {
212 return Err(TerminalError::InvalidGraph(format!(
213 "end node '{}' not found",
214 self.end
215 )));
216 }
217
218 for edge in &self.edges {
219 if !self.nodes.contains_key(&edge.from) {
220 return Err(TerminalError::InvalidGraph(format!(
221 "edge references non-existent source node '{}'",
222 edge.from
223 )));
224 }
225 if !self.nodes.contains_key(&edge.to) {
226 return Err(TerminalError::InvalidGraph(format!(
227 "edge references non-existent target node '{}'",
228 edge.to
229 )));
230 }
231 }
232
233 Ok(())
234 }
235
236 pub fn analyze(&self) -> GraphDiagnostics {
238 graph_analysis::analyze_graph(self)
239 }
240
241 pub fn analyze_cycles(&self) -> CycleAnalysis {
243 let cycles = graph_analysis::find_all_cycles(self);
244 let unprotected = graph_analysis::filter_unprotected_cycles(self, &cycles);
245
246 CycleAnalysis {
247 has_cycles: !cycles.is_empty(),
248 cycles,
249 unprotected_cycles: unprotected,
250 total_edges: self.edges.len(),
251 protected_edges: self
252 .edges
253 .iter()
254 .filter(|e| e.analysis.as_ref().is_some_and(|a| a.max_visits.is_some()))
255 .count(),
256 }
257 }
258
259 pub async fn run_inline(
277 &self,
278 exec_ctx: &mut ExecutionEngine<S>,
279 max_steps: usize,
280 ) -> Result<(), GraphError> {
281 let mut current = self.start_node().to_string();
282 let mut step: usize = 0;
283
284 loop {
285 step += 1;
286 if step > max_steps {
287 return Err(GraphError::Terminal(TerminalError::StepsExceeded {
288 limit: max_steps,
289 }));
290 }
291
292 let node = self.nodes.get(¤t).ok_or_else(|| {
293 GraphError::Terminal(TerminalError::NodeNotFound(current.clone()))
294 })?;
295
296 match node {
298 NodeKind::Task(n) => {
299 let mut ctx = exec_ctx.build_node_context();
300 n.execute(&mut ctx).await?;
301 }
302 NodeKind::Condition(n) => {
303 let mut ctx = exec_ctx.build_leaf_context();
304 <ConditionNode<S> as LeafNode<S>>::execute(n, &mut ctx).await?;
305 }
306 NodeKind::Barrier(n) => {
307 let mut ctx = exec_ctx.build_leaf_context();
308 <BarrierNode<S> as LeafNode<S>>::execute(n, &mut ctx).await?;
309 }
310 NodeKind::External(n) => {
311 let mut ctx = exec_ctx.build_node_context();
312 n.execute(&mut ctx).await?;
313 }
314 NodeKind::ExternalLeaf(n) => {
315 let mut ctx = exec_ctx.build_leaf_context();
316 n.execute(&mut ctx).await?;
317 }
318 NodeKind::Parallel(p) => {
319 p.execute(exec_ctx).await?;
321 }
322 }
323
324 exec_ctx.commit();
326
327 let _flow_events = exec_ctx.take_flow_events();
329
330 let (next_action, _signal) = exec_ctx.take_control();
332
333 match next_action {
335 NextAction::End => return Ok(()),
336 NextAction::Goto(target) => {
337 current = target;
338 }
339 NextAction::Next => {
340 if current == self.end_node() {
341 return Ok(());
342 }
343 current = self.resolve_next_inline(¤t, exec_ctx.state())?;
344 }
345 }
346 }
347 }
348}
349
350pub struct PendingEdge<'a, S: WorkflowState = State, M: MergeStrategy<S> = StateMerge> {
354 builder: &'a mut GraphBuilder<S, M>,
355 edge_index: usize,
356}
357
358impl<'a, S: WorkflowState, M: MergeStrategy<S>> PendingEdge<'a, S, M> {
359 pub fn max_visits(self, n: usize) -> &'a mut GraphBuilder<S, M> {
360 self.builder.edges[self.edge_index].analysis = Some(EdgeAnalysis {
361 max_visits: Some(n),
362 });
363 self.builder
364 }
365}
366
367pub struct GraphBuilder<S: WorkflowState = State, M: MergeStrategy<S> = StateMerge> {
371 name: String,
372 nodes: IndexMap<String, NodeKind<S, M>>,
373 edges: Vec<Edge<S>>,
374 start: Option<String>,
375 end: Option<String>,
376}
377
378impl<S: WorkflowState, M: MergeStrategy<S>> GraphBuilder<S, M> {
379 pub fn new(name: impl Into<String>) -> Self {
385 Self {
386 name: name.into(),
387 nodes: IndexMap::new(),
388 edges: Vec::new(),
389 start: None,
390 end: None,
391 }
392 }
393
394 pub fn start(&mut self, node: impl Into<String>) -> &mut Self {
395 self.start = Some(node.into());
396 self
397 }
398
399 pub fn end(&mut self, node: impl Into<String>) -> &mut Self {
400 self.end = Some(node.into());
401 self
402 }
403
404 pub fn node(&mut self, name: impl Into<String>, kind: NodeKind<S, M>) -> &mut Self {
405 self.nodes.insert(name.into(), kind);
406 self
407 }
408
409 pub fn edge(
410 &mut self,
411 from: impl Into<String>,
412 to: impl Into<String>,
413 ) -> PendingEdge<'_, S, M> {
414 let edge_index = self.edges.len();
415 self.edges.push(Edge {
416 from: from.into(),
417 to: to.into(),
418 condition: None,
419 analysis: None,
420 fallback: false,
421 });
422 PendingEdge {
423 builder: self,
424 edge_index,
425 }
426 }
427
428 pub fn edge_if(
429 &mut self,
430 from: impl Into<String>,
431 to: impl Into<String>,
432 condition: impl Fn(&S) -> bool + Send + Sync + 'static,
433 ) -> PendingEdge<'_, S, M> {
434 let edge_index = self.edges.len();
435 self.edges.push(Edge {
436 from: from.into(),
437 to: to.into(),
438 condition: Some(Arc::new(condition)),
439 analysis: None,
440 fallback: false,
441 });
442 PendingEdge {
443 builder: self,
444 edge_index,
445 }
446 }
447
448 pub fn edge_fallback(
449 &mut self,
450 from: impl Into<String>,
451 to: impl Into<String>,
452 ) -> PendingEdge<'_, S, M> {
453 let edge_index = self.edges.len();
454 self.edges.push(Edge {
455 from: from.into(),
456 to: to.into(),
457 condition: None,
458 analysis: None,
459 fallback: true,
460 });
461 PendingEdge {
462 builder: self,
463 edge_index,
464 }
465 }
466
467 pub fn build(self) -> Result<Graph<S, M>, BuildErrors> {
468 let mut errors = BuildErrors::new();
469
470 let start = match self.start {
471 Some(s) => s,
472 None => {
473 errors.push(BuildError::MissingEntryPoint);
474 return Err(errors);
475 }
476 };
477 let end = match self.end {
478 Some(s) => s,
479 None => {
480 errors.push(BuildError::MissingExitPoint);
481 return Err(errors);
482 }
483 };
484
485 let mut seen_nodes = std::collections::HashSet::new();
486 for name in self.nodes.keys() {
487 if !seen_nodes.insert(name.clone()) {
488 errors.push(BuildError::DuplicateNode { id: name.clone() });
489 }
490 }
491
492 for edge in &self.edges {
493 if !self.nodes.contains_key(&edge.from) {
494 errors.push(BuildError::MissingNode {
495 from: edge.from.clone(),
496 to: edge.to.clone(),
497 });
498 }
499 if !self.nodes.contains_key(&edge.to) {
500 errors.push(BuildError::MissingNode {
501 from: edge.from.clone(),
502 to: edge.to.clone(),
503 });
504 }
505 }
506
507 if !errors.is_empty() {
508 return Err(errors);
509 }
510
511 let graph = Graph {
512 name: self.name,
513 nodes: self.nodes,
514 edges: self.edges,
515 start,
516 end,
517 };
518
519 if let Err(e) = graph.validate() {
520 return Err(BuildErrors(vec![BuildError::InvalidEdgeDefinition {
521 from: "graph".to_string(),
522 to: "graph".to_string(),
523 reason: e.to_string(),
524 }]));
525 }
526
527 Ok(graph)
528 }
529
530 pub fn name(&self) -> &str {
531 &self.name
532 }
533}
534
535fn fnv_hash(s: &str) -> u64 {
538 let mut hash: u64 = 0xcbf29ce484222325;
539 for &byte in s.as_bytes() {
540 hash ^= byte as u64;
541 hash = hash.wrapping_mul(0x100000001b3);
542 }
543 hash
544}