Skip to main content

lellm_graph/
node.rs

1//! 节点核心类型与模块。
2//!
3//! v0.4 终态架构:
4//!
5//! - `LeafNode<S>` — 声明式业务节点,只能读 State + emit Mutation
6//! - `ExecutorOperation<S>` — 命令式执行控制,可以 clone/merge/replace_state
7//! - `NodeKind<S, M>` — Graph 的 AST(不实现任何执行 trait)
8//! - `FlowNode<S>` — 向后兼容,等同于 LeafNode
9//!
10//! 职责边界:
11//!
12//! ```text
13//! Graph (AST)
14//!     └── NodeKind
15//!
16//! ExecutionEngine (runtime owner)
17//!     ├── dispatch → match NodeKind
18//!     ├── build_leaf_context() → LeafNode
19//!     └── pass &mut self → ExecutorOperation
20//!
21//! LeafNode
22//!     └── 只能 emit Mutation
23//!
24//! ExecutorOperation
25//!     └── 可以操纵 Executor(fork / merge / replace_state / subgraph)
26//! ```
27
28use std::sync::Arc;
29
30use async_trait::async_trait;
31
32use crate::error::GraphError;
33use crate::execution_engine::ExecutionEngine;
34pub use crate::node_context::LeafContext;
35use crate::node_context::NodeContext;
36use crate::state::{State, StateMerge};
37use crate::workflow_state::{MergeStrategy, WorkflowState};
38
39// ─── 子模块重新导出 ────────────────────────────────────────────
40
41pub use crate::barrier_node::{BarrierDefaultAction, BarrierNode};
42pub use crate::parallel_node::{ParallelErrorStrategy, ParallelNode, ParallelNodeBuilder};
43
44// ─── LeafNode Trait ───────────────────────────────────────────
45
46/// 声明式业务节点 — 只能读 State + emit Mutation。
47///
48/// 设计原则:
49/// - **只能读 State**(`ctx.state()` 返回 `&S`)
50/// - **只能 emit Mutation**(`ctx.record()`)
51/// - **不能 replace_state / clone_state / fork / merge**
52///
53/// 与 `ExecutorOperation` 完全不同维度:
54/// - LeafNode = 业务逻辑(Task, Condition, Barrier, LLM, Tool)
55/// - ExecutorOperation = 执行控制(Parallel, Retry, Loop, SubGraph)
56///
57/// # 泛型参数
58///
59/// - `S` — 类型化状态(默认 `State` = HashMap,向后兼容)
60#[async_trait]
61pub trait LeafNode<S: WorkflowState = State>: Send + Sync {
62    /// 执行节点逻辑。
63    async fn execute(&self, ctx: &mut LeafContext<'_, S>) -> Result<(), GraphError>;
64}
65
66// ─── ExecutorOperation Trait ──────────────────────────────────
67
68/// 命令式执行控制 — Composite 节点使用。
69///
70/// 直接接收 `&mut ExecutionEngine<S>`,拥有完整能力:
71/// - clone_state / replace_state
72/// - build_leaf_context(用于执行子分支)
73///
74/// 这不是"节点",而是 ExecutionEngine 的内部控制逻辑扩展。
75#[async_trait]
76pub trait ExecutorOperation<S: WorkflowState = State>: Send + Sync {
77    /// 执行组合操作。
78    async fn execute(&self, engine: &mut ExecutionEngine<S>) -> Result<(), GraphError>;
79}
80
81// ─── Backward Compat: FlowNode ────────────────────────────────
82
83/// 向后兼容 — `FlowNode` trait。
84///
85/// 保留此名称以兼容现有代码。
86/// 接收 `NodeContext`(持有 `&mut S`),以便旧代码继续工作。
87#[async_trait]
88pub trait FlowNode<S: WorkflowState = State>: Send + Sync {
89    /// 执行节点逻辑。
90    async fn execute(&self, ctx: &mut NodeContext<'_, S>) -> Result<(), GraphError>;
91}
92
93// ─── NodeKind (AST Only) ──────────────────────────────────────
94
95/// Graph 的 AST — 节点类型枚举。
96///
97/// **不实现任何执行 trait。** 它只是数据结构。
98/// 执行分发由 ExecutionEngine 的 match 负责。
99///
100/// # 泛型参数
101///
102/// - `S` — 类型化状态(默认 `State` = HashMap,向后兼容)
103/// - `M` — 并行合并策略(仅 `Parallel` 变体使用,默认 [`StateMerge`])
104pub enum NodeKind<S: WorkflowState = State, M: MergeStrategy<S> = StateMerge> {
105    /// 自定义逻辑
106    Task(TaskNode<S>),
107    /// 条件分支
108    Condition(ConditionNode<S>),
109    /// Human-in-the-loop 审批屏障
110    Barrier(BarrierNode<S>),
111    /// 并行执行多个分支
112    Parallel(ParallelNode<S, M>),
113    /// 外部节点(由 lellm-agent 等 crate 提供)— 向后兼容,使用 NodeContext
114    External(Arc<dyn FlowNode<S>>),
115    /// 外部 Leaf 节点 — 只能读 State + emit Mutation
116    ExternalLeaf(Arc<dyn LeafNode<S>>),
117}
118
119impl<S: WorkflowState, M: MergeStrategy<S>> Clone for NodeKind<S, M> {
120    fn clone(&self) -> Self {
121        match self {
122            Self::Task(n) => Self::Task(n.clone()),
123            Self::Condition(n) => Self::Condition(n.clone()),
124            Self::Barrier(n) => Self::Barrier(n.clone()),
125            Self::Parallel(n) => Self::Parallel(n.clone()),
126            Self::External(n) => Self::External(n.clone()),
127            Self::ExternalLeaf(n) => Self::ExternalLeaf(n.clone()),
128        }
129    }
130}
131
132// ─── TaskNode ────────────────────────────────────────────────
133
134/// Task 节点回调类型别名(向后兼容 NodeContext)。
135pub type TaskFn<S> = Arc<dyn Fn(&mut NodeContext<'_, S>) -> Result<(), GraphError> + Send + Sync>;
136
137/// 自定义逻辑节点。
138#[derive(Clone)]
139pub struct TaskNode<S: WorkflowState = State> {
140    pub name: String,
141    pub func: TaskFn<S>,
142}
143
144impl<S: WorkflowState> TaskNode<S> {
145    pub fn new(
146        name: impl Into<String>,
147        func: impl Fn(&mut NodeContext<'_, S>) -> Result<(), GraphError> + Send + Sync + 'static,
148    ) -> Self {
149        Self {
150            name: name.into(),
151            func: Arc::new(func),
152        }
153    }
154}
155
156/// TaskNode 实现 FlowNode(向后兼容 — 使用 NodeContext)。
157///
158/// 未来将迁移到 LeafNode + LeafContext。
159#[async_trait]
160impl<S: WorkflowState> FlowNode<S> for TaskNode<S> {
161    async fn execute(&self, ctx: &mut NodeContext<'_, S>) -> Result<(), GraphError> {
162        (self.func)(ctx)
163    }
164}
165
166// ─── ConditionNode ───────────────────────────────────────────
167
168/// 条件分支回调类型别名。
169pub type BranchCondition<S> = Arc<dyn Fn(&S) -> bool + Send + Sync>;
170
171/// 条件分支节点。
172#[derive(Clone)]
173pub struct ConditionNode<S: WorkflowState = State> {
174    pub name: String,
175    pub branches: Vec<(String, BranchCondition<S>)>,
176}
177
178impl<S: WorkflowState> ConditionNode<S> {
179    pub fn builder(name: impl Into<String>) -> ConditionNodeBuilder<S> {
180        ConditionNodeBuilder {
181            name: name.into(),
182            branches: Vec::new(),
183        }
184    }
185}
186
187/// ConditionNode 构建器。
188pub struct ConditionNodeBuilder<S: WorkflowState = State> {
189    name: String,
190    branches: Vec<(String, BranchCondition<S>)>,
191}
192
193impl<S: WorkflowState> ConditionNodeBuilder<S> {
194    pub fn branch(
195        mut self,
196        target: impl Into<String>,
197        condition: impl Fn(&S) -> bool + Send + Sync + 'static,
198    ) -> Self {
199        self.branches.push((target.into(), Arc::new(condition)));
200        self
201    }
202
203    pub fn build(self) -> ConditionNode<S> {
204        ConditionNode {
205            name: self.name,
206            branches: self.branches,
207        }
208    }
209}
210
211/// ConditionNode 实现 LeafNode(推荐路径 — 只读 state + goto)。
212#[async_trait]
213impl<S: WorkflowState> LeafNode<S> for ConditionNode<S> {
214    async fn execute(&self, ctx: &mut LeafContext<'_, S>) -> Result<(), GraphError> {
215        let state = ctx.state();
216        for (target, condition) in &self.branches {
217            if condition(state) {
218                ctx.goto(target);
219                return Ok(());
220            }
221        }
222        Ok(())
223    }
224}
225
226/// ConditionNode 实现 FlowNode(向后兼容 — 使用 NodeContext)。
227#[async_trait]
228impl<S: WorkflowState> FlowNode<S> for ConditionNode<S> {
229    async fn execute(&self, ctx: &mut NodeContext<'_, S>) -> Result<(), GraphError> {
230        let state = ctx.state();
231        for (target, condition) in &self.branches {
232            if condition(state) {
233                ctx.goto(target);
234                return Ok(());
235            }
236        }
237        Ok(())
238    }
239}
240
241// ─── Backward Compatibility Alias ─────────────────────────────
242
243/// 向后兼容别名 — `GraphNode` → `FlowNode`。
244pub type GraphNode<S> = dyn FlowNode<S>;