Skip to main content

lellm_graph/graph/
graph.rs

1//! Graph — 图结构核心类型。
2//!
3//! Edge 三类边模型:
4//! - **条件边** (`edge_if`) — `if/else-if` 规则链,按注册顺序求值,first match wins
5//! - **普通边** (`edge`) — 无条件非 fallback,条件边无命中时生效
6//! - **Fallback 边** (`edge_fallback`) — 最后兜底
7//!
8//! v0.4+: 泛型化 `Graph<S: WorkflowState>`,默认 `S = State`(向后兼容)。
9//!
10//! 运行时安全由 `run_inline()` 的 `max_steps` 参数负责。
11
12use std::sync::Arc;
13
14use indexmap::IndexMap;
15
16use super::graph_analysis::{self, CycleAnalysis};
17use super::graph_builder::fnv_hash;
18use crate::error::{GraphDiagnostics, GraphError, TerminalError};
19use crate::exec::execution_engine::{ExecutionEngine, ExecutionSignal, ExecutorState, NextAction};
20use crate::node::{BarrierNode, ConditionNode, FlowNode, LeafNode, NodeKind};
21use crate::state::workflow_state::{MergeStrategy, WorkflowState};
22use crate::state::{State, StateMerge};
23
24// ─── StepCallback ─────────────────────────────────────────────
25
26/// 每步回调 — run_inline 在每个节点执行完成后调用。
27///
28/// 用于 wrapper(如 run_execution_loop)追踪 execution_log 或发射 per-node 事件。
29/// 回调在 commit + checkpoint 之后、take_control 之前调用。
30pub trait StepCallback<'e>: Send {
31    /// 节点执行完成后的回调。
32    ///
33    /// - `node_name` — 刚执行完的节点名
34    /// - `step` — 当前步数(从 1 开始)
35    /// - `duration` — 节点执行耗时
36    fn on_step(&mut self, node_name: &str, step: usize, duration: std::time::Duration);
37}
38
39/// 空回调 — 不执行任何操作。
40pub struct NoopStepCallback;
41
42impl<'e> StepCallback<'e> for NoopStepCallback {
43    fn on_step(&mut self, _node_name: &str, _step: usize, _duration: std::time::Duration) {}
44}
45
46// ─── Edge ──────────────────────────────────────────────────────
47
48/// 边条件回调类型别名。
49pub type EdgeCondition<S> = Arc<dyn Fn(&S) -> bool + Send + Sync>;
50
51/// 边(Edge)— 三类边模型。
52#[derive(Clone)]
53pub struct Edge<S: WorkflowState = State> {
54    pub from: String,
55    pub to: String,
56    /// 路由条件。Some = 条件边;None = 普通边或 fallback 边。
57    pub condition: Option<EdgeCondition<S>>,
58    /// 分析用约束(不参与 runtime 决策)
59    pub analysis: Option<EdgeAnalysis>,
60    /// 是否为 fallback 边(最后兜底)
61    pub fallback: bool,
62}
63
64impl<S: WorkflowState> Edge<S> {
65    /// 判断是否为条件边。
66    pub fn is_conditional(&self) -> bool {
67        self.condition.is_some() && !self.fallback
68    }
69
70    /// 判断是否为普通边(无条件非 fallback)。
71    pub fn is_normal(&self) -> bool {
72        self.condition.is_none() && !self.fallback
73    }
74}
75
76impl<S: WorkflowState> std::fmt::Debug for Edge<S> {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        f.debug_struct("Edge")
79            .field("from", &self.from)
80            .field("to", &self.to)
81            .field("has_condition", &self.condition.is_some())
82            .field("analysis", &self.analysis)
83            .field("fallback", &self.fallback)
84            .finish()
85    }
86}
87
88/// 分析用约束 — 仅用于 `analyze_cycles()` 静态分析。
89#[derive(Debug, Clone)]
90pub struct EdgeAnalysis {
91    /// 建议的最大访问次数
92    pub max_visits: Option<usize>,
93}
94
95// ─── Graph ─────────────────────────────────────────────────────
96
97/// 图(Graph)— 允许有环,循环保护由 GraphExecutor::max_steps 运行时熔断提供。
98#[derive(Clone)]
99pub struct Graph<S: WorkflowState = State, M: MergeStrategy<S> = StateMerge> {
100    pub(crate) name: String,
101    pub(crate) nodes: IndexMap<String, NodeKind<S, M>>,
102    pub(crate) edges: Vec<Edge<S>>,
103    pub(crate) start: String,
104    pub(crate) end: String,
105    /// P0-2: Canonical AST hash — 从 DSL 层计算,不依赖 HashMap 顺序。
106    /// 用于 Checkpoint 的 graph compatibility 校验。
107    pub(crate) canonical_hash: u64,
108}
109
110impl<S: WorkflowState, M: MergeStrategy<S>> Graph<S, M> {
111    pub fn name(&self) -> &str {
112        &self.name
113    }
114
115    pub fn node_names(&self) -> Vec<&str> {
116        self.nodes.keys().map(|s| s.as_str()).collect()
117    }
118
119    pub fn start_node(&self) -> &str {
120        &self.start
121    }
122
123    pub fn end_node(&self) -> &str {
124        &self.end
125    }
126
127    /// 获取 canonical AST hash — 从 DSL 层计算,不依赖 HashMap 顺序。
128    ///
129    /// 用于 Checkpoint 的 graph compatibility 校验。
130    /// 相同输入永远产生相同 hash,Checkpoint 不会因此失效。
131    pub fn canonical_hash(&self) -> u64 {
132        self.canonical_hash
133    }
134
135    /// 计算图结构指纹 hash(u64 原始值)— 基于 compiled graph 结构。
136    ///
137    /// 注意:此 hash 依赖 HashMap 迭代顺序,可能不稳定。
138    /// 优先使用 `canonical_hash()` 进行 Checkpoint 校验。
139    pub fn hash_u64(&self) -> u64 {
140        let mut s = String::new();
141        let mut names: Vec<&str> = self.nodes.keys().map(|k| k.as_str()).collect();
142        names.sort();
143        s.push_str(&names.join(","));
144        s.push('|');
145        let mut edge_strs: Vec<String> = self
146            .edges
147            .iter()
148            .map(|e| {
149                format!(
150                    "{}->{}{:?}{}",
151                    e.from,
152                    e.to,
153                    if e.condition.is_some() { "?" } else { "" },
154                    if e.fallback { "!" } else { "" }
155                )
156            })
157            .collect();
158        edge_strs.sort();
159        s.push_str(&edge_strs.join(","));
160        fnv_hash(&s)
161    }
162
163    /// 计算图结构指纹 hash(hex 字符串)。
164    pub fn hash(&self) -> String {
165        format!("{:016x}", self.canonical_hash)
166    }
167
168    pub fn edges_from(&self, from: &str) -> Vec<&Edge<S>> {
169        self.edges.iter().filter(|e| e.from == from).collect()
170    }
171
172    pub fn find_edge(&self, from: &str, to: &str) -> Option<&Edge<S>> {
173        self.edges.iter().find(|e| e.from == from && e.to == to)
174    }
175
176    /// 获取节点映射表引用。
177    pub fn node_map(&self) -> &IndexMap<String, NodeKind<S, M>> {
178        &self.nodes
179    }
180
181    /// 路由解析 — 根据当前节点和 State 找到下一个节点(返回 Option)。
182    ///
183    /// 内部统一使用的边评估逻辑。无匹配时返回 `None`(不区分"无边"和"无匹配")。
184    fn resolve_next(&self, current: &str, state: &S) -> Option<String> {
185        // 1. 条件边
186        for edge in self.edges_from(current) {
187            if edge.is_conditional() && edge.condition.as_ref().is_some_and(|c| c(state)) {
188                return Some(edge.to.clone());
189            }
190        }
191
192        // 2. 普通边
193        for edge in self.edges_from(current) {
194            if edge.is_normal() {
195                return Some(edge.to.clone());
196            }
197        }
198
199        // 3. Fallback 边
200        for edge in self.edges_from(current) {
201            if edge.fallback {
202                return Some(edge.to.clone());
203            }
204        }
205
206        None
207    }
208
209    /// 路由解析 — 内联执行使用,无匹配时返回错误。
210    pub(crate) fn resolve_next_inline(
211        &self,
212        current: &str,
213        state: &S,
214    ) -> Result<String, GraphError> {
215        if self.edges_from(current).is_empty() {
216            return Err(GraphError::Terminal(TerminalError::InvalidGraph(format!(
217                "node '{}' has no outgoing edges and is not the end node",
218                current
219            ))));
220        }
221
222        self.resolve_next(current, state).ok_or_else(|| {
223            GraphError::Terminal(TerminalError::InvalidGraph(format!(
224                "node '{}' has no matching outgoing edge",
225                current
226            )))
227        })
228    }
229
230    pub fn find_fallback_edge(&self, from: &str) -> Option<String> {
231        self.edges
232            .iter()
233            .find(|e| e.from == from && e.fallback)
234            .map(|e| e.to.clone())
235    }
236
237    /// 验证图结构。
238    pub fn validate(&self) -> Result<(), TerminalError> {
239        if !self.nodes.contains_key(&self.start) {
240            return Err(TerminalError::InvalidGraph(format!(
241                "start node '{}' not found",
242                self.start
243            )));
244        }
245
246        if !self.nodes.contains_key(&self.end) {
247            return Err(TerminalError::InvalidGraph(format!(
248                "end node '{}' not found",
249                self.end
250            )));
251        }
252
253        for edge in &self.edges {
254            if !self.nodes.contains_key(&edge.from) {
255                return Err(TerminalError::InvalidGraph(format!(
256                    "edge references non-existent source node '{}'",
257                    edge.from
258                )));
259            }
260            if !self.nodes.contains_key(&edge.to) {
261                return Err(TerminalError::InvalidGraph(format!(
262                    "edge references non-existent target node '{}'",
263                    edge.to
264                )));
265            }
266        }
267
268        Ok(())
269    }
270
271    /// 完整图诊断分析。
272    pub fn analyze(&self) -> GraphDiagnostics {
273        graph_analysis::analyze_graph(self)
274    }
275
276    /// @deprecated 使用 [`analyze()`](Self::analyze) 替代。
277    pub fn analyze_cycles(&self) -> CycleAnalysis {
278        let cycles = graph_analysis::find_all_cycles(self);
279        let unprotected = graph_analysis::filter_unprotected_cycles(self, &cycles);
280
281        CycleAnalysis {
282            has_cycles: !cycles.is_empty(),
283            cycles,
284            unprotected_cycles: unprotected,
285            total_edges: self.edges.len(),
286            protected_edges: self
287                .edges
288                .iter()
289                .filter(|e| e.analysis.as_ref().is_some_and(|a| a.max_visits.is_some()))
290                .count(),
291        }
292    }
293
294    // ─── 内联执行 ────────────────────────────────────────────
295
296    /// 内联执行 — 唯一的执行路径。
297    ///
298    /// 接收 [`ExecutionEngine`](借用 State),内部循环构建 [`NodeContext`](能力视图)
299    /// 供节点使用。
300    ///
301    /// 数据流:
302    /// ```text
303    /// ExecutionEngine
304    ///   → build_node_context()  → NodeContext<'_, S>
305    ///   → node.execute(ctx)     → 节点 record() Mutations
306    ///   → drop(ctx)             → 释放借用
307    ///   → commit()              → apply Mutations 到 State
308    ///   → emit_checkpoint()     → 通知 CheckpointSink
309    ///   → step_cb.on_step()     → 通知 wrapper(追踪/事件)
310    ///   → take_control()        → 获取路由信号
311    /// ```
312    ///
313    /// # StepCallback
314    ///
315    /// 每步回调在 commit + checkpoint 之后、take_control 之前调用。
316    /// 用于 wrapper(如 `run_execution_loop`)追踪 execution_log 或发射 per-node 事件。
317    pub async fn run_inline<'cb>(
318        &self,
319        exec_ctx: &mut ExecutionEngine<'_, S>,
320        max_steps: usize,
321        step_cb: &mut dyn StepCallback<'cb>,
322    ) -> Result<(), GraphError> {
323        let mut current = self.start_node().to_string();
324        let mut step: usize = 0;
325
326        loop {
327            step += 1;
328            if step > max_steps {
329                return Err(GraphError::Terminal(TerminalError::StepsExceeded {
330                    limit: max_steps,
331                }));
332            }
333
334            let node = self.nodes.get(&current).ok_or_else(|| {
335                GraphError::Terminal(TerminalError::NodeNotFound(current.clone()))
336            })?;
337
338            let node_start = std::time::Instant::now();
339
340            // 根据 NodeKind 分发执行
341            match node {
342                NodeKind::Task(n) => {
343                    let mut ctx = exec_ctx.build_node_context();
344                    n.execute(&mut ctx).await?;
345                }
346                NodeKind::Condition(n) => {
347                    let mut ctx = exec_ctx.build_leaf_context();
348                    <ConditionNode<S> as LeafNode<S>>::execute(n, &mut ctx).await?;
349                }
350                NodeKind::Barrier(n) => {
351                    let mut ctx = exec_ctx.build_leaf_context();
352                    <BarrierNode<S> as LeafNode<S>>::execute(n, &mut ctx).await?;
353                }
354                NodeKind::External(n) => {
355                    let mut ctx = exec_ctx.build_node_context();
356                    n.execute(&mut ctx).await?;
357                }
358                NodeKind::ExternalLeaf(n) => {
359                    let mut ctx = exec_ctx.build_leaf_context();
360                    n.execute(&mut ctx).await?;
361                }
362                NodeKind::Parallel(p) => {
363                    // ExecutorOperation 直接接收 &mut ExecutionEngine
364                    p.execute(exec_ctx).await?;
365                }
366                NodeKind::Subgraph(spec) => {
367                    // Subgraph 执行 — 通过 CompiledSubgraph 的 StateProjector 递归执行内层 Graph
368                    let stream = exec_ctx.stream_sink();
369                    let cancel = exec_ctx.cancel_token().clone();
370                    spec.execute(exec_ctx.state_mut(), stream, cancel).await?;
371                }
372            }
373
374            // commit mutations (Unit of Work) — 对 Parallel 是空操作
375            exec_ctx.commit();
376
377            // checkpoint — 通知 Sink 到达了合法的恢复边界。
378            // 顺序:execute → commit → checkpoint → step_cb → route
379            exec_ctx.emit_checkpoint(&current, step);
380
381            // 每步回调 — 供 wrapper 追踪 execution_log / 发射 per-node 事件
382            step_cb.on_step(&current, step, node_start.elapsed());
383
384            // 提取控制信号
385            let (next_action, signal) = exec_ctx.take_control();
386
387            // 处理 Barrier Pause 信号
388            if let Some(ExecutionSignal::Pause {
389                barrier_id,
390                timeout,
391            }) = signal
392            {
393                let outcome = exec_ctx.wait_barrier(&barrier_id, timeout).await;
394                match outcome {
395                    crate::node::barrier_sink::BarrierOutcome::Decision(
396                        crate::event::BarrierDecision::Reroute { target },
397                    ) => {
398                        current = target;
399                        continue;
400                    }
401                    crate::node::barrier_sink::BarrierOutcome::Decision(
402                        crate::event::BarrierDecision::Approve
403                        | crate::event::BarrierDecision::Reject { .. }
404                        | crate::event::BarrierDecision::Modify { .. },
405                    ) => {
406                        // Approve/Reject/Modify — 继续正常路由
407                    }
408                    crate::node::barrier_sink::BarrierOutcome::TimedOut => {
409                        // 超时 — 默认 Reject 语义,继续正常路由
410                    }
411                    crate::node::barrier_sink::BarrierOutcome::Cancelled => {
412                        return Err(GraphError::Terminal(
413                            crate::error::TerminalError::BarrierCancelled { node: current },
414                        ));
415                    }
416                }
417            }
418
419            // 处理路由
420            match next_action {
421                NextAction::End => return Ok(()),
422                NextAction::Goto(target) => {
423                    current = target;
424                }
425                NextAction::Next => {
426                    if current == self.end_node() {
427                        return Ok(());
428                    }
429                    current = self.resolve_next_inline(&current, exec_ctx.state())?;
430                }
431            }
432        }
433    }
434}
435
436// GraphBuilder, PendingEdge 已在 mod.rs 中 re-export