Skip to main content

lellm_graph/node/
node.rs

1//! 节点核心类型与模块。
2//!
3//! v0.4 终态架构:
4//!
5//! - `LeafNode<S>` — 声明式业务节点,只能读 State + emit Mutation
6//! - `ExecutorOperation<S>` — 命令式执行控制,可以 clone/merge/replace_state
7//! - `NodeKind<S, M>` — Graph 的 AST(不实现任何执行 trait)
8//! - `FlowNode<S>` — 向后兼容,等同于 LeafNode
9//!
10//! 职责边界:
11//!
12//! ```text
13//! Graph (AST)
14//!     └── NodeKind
15//!
16//! ExecutionEngine (runtime owner)
17//!     ├── dispatch → match NodeKind
18//!     ├── build_leaf_context() → LeafNode
19//!     └── pass &mut self → ExecutorOperation
20//!
21//! LeafNode
22//!     └── 只能 emit Mutation
23//!
24//! ExecutorOperation
25//!     └── 可以操纵 Executor(fork / merge / replace_state / subgraph)
26//! ```
27
28use std::sync::Arc;
29
30use async_trait::async_trait;
31
32pub use super::node_context::LeafContext;
33use super::node_context::NodeContext;
34use crate::error::GraphError;
35use crate::exec::execution_engine::ExecutionEngine;
36use crate::state::workflow_state::{MergeStrategy, WorkflowState};
37use crate::state::{State, StateMerge};
38
39// ─── 子模块重新导出 ────────────────────────────────────────────
40
41pub use super::barrier_node::{BarrierDefaultAction, BarrierNode};
42pub use super::parallel_node::{ParallelErrorStrategy, ParallelNode, ParallelNodeBuilder};
43
44// ─── LeafNode Trait ───────────────────────────────────────────
45
46/// 声明式业务节点 — 只能读 State + emit Mutation。
47///
48/// 设计原则:
49/// - **只能读 State**(`ctx.state()` 返回 `&S`)
50/// - **只能 emit Mutation**(`ctx.record()`)
51/// - **不能 replace_state / clone_state / fork / merge**
52///
53/// 与 `ExecutorOperation` 完全不同维度:
54/// - LeafNode = 业务逻辑(Task, Condition, Barrier, LLM, Tool)
55/// - ExecutorOperation = 执行控制(Parallel, Retry, Loop, SubGraph)
56///
57/// # 泛型参数
58///
59/// - `S` — 类型化状态(默认 `State` = HashMap,向后兼容)
60#[async_trait]
61pub trait LeafNode<S: WorkflowState = State>: Send + Sync {
62    /// 执行节点逻辑。
63    async fn execute(&self, ctx: &mut LeafContext<'_, S>) -> Result<(), GraphError>;
64}
65
66// ─── ExecutorOperation Trait ──────────────────────────────────
67
68/// 命令式执行控制 — Composite 节点使用。
69///
70/// 直接接收 `&mut ExecutionEngine`,拥有完整能力:
71/// - clone_state / replace_state
72/// - build_leaf_context(用于执行子分支)
73///
74/// 这不是"节点",而是 ExecutionEngine 的内部控制逻辑扩展。
75#[async_trait]
76pub trait ExecutorOperation<S: WorkflowState = State>: Send + Sync {
77    /// 执行组合操作。
78    async fn execute(&self, engine: &mut ExecutionEngine<'_, S>) -> Result<(), GraphError>;
79}
80
81// ─── Backward Compat: FlowNode ────────────────────────────────
82
83/// 向后兼容 — `FlowNode` trait。
84///
85/// 保留此名称以兼容现有代码。
86/// 接收 `NodeContext`(持有 `&mut S`),以便旧代码继续工作。
87#[async_trait]
88pub trait FlowNode<S: WorkflowState = State>: Send + Sync {
89    /// 执行节点逻辑。
90    async fn execute(&self, ctx: &mut NodeContext<'_, S>) -> Result<(), GraphError>;
91}
92
93// ─── NodeKind (AST Only) ──────────────────────────────────────
94
95/// Graph 的 AST — 节点类型枚举。
96///
97/// **不实现任何执行 trait。** 它只是数据结构。
98/// 执行分发由 ExecutionEngine 的 match 负责。
99///
100/// # 泛型参数
101///
102/// - `S` — 类型化状态(默认 `State` = HashMap,向后兼容)
103/// - `M` — 并行合并策略(仅 `Parallel` 变体使用,默认 [`StateMerge`])
104pub enum NodeKind<S: WorkflowState = State, M: MergeStrategy<S> = StateMerge> {
105    /// 自定义逻辑
106    Task(TaskNode<S>),
107    /// 条件分支
108    Condition(ConditionNode<S>),
109    /// Human-in-the-loop 审批屏障
110    Barrier(BarrierNode<S>),
111    /// 并行执行多个分支
112    Parallel(ParallelNode<S, M>),
113    /// 外部节点(由 lellm-agent 等 crate 提供)— 向后兼容,使用 NodeContext
114    External(Arc<dyn FlowNode<S>>),
115    /// 外部 Leaf 节点 — 只能读 State + emit Mutation
116    ExternalLeaf(Arc<dyn LeafNode<S>>),
117    /// Subgraph 节点 — 运行时递归执行内层 Graph
118    ///
119    /// CompiledSubgraph 持有类型擦除的 StateProjector,
120    /// 包含 Graph + Lens + Merge 的执行逻辑。
121    Subgraph(super::compiled_subgraph::CompiledSubgraph<S>),
122}
123
124impl<S: WorkflowState, M: MergeStrategy<S>> Clone for NodeKind<S, M> {
125    fn clone(&self) -> Self {
126        match self {
127            Self::Task(n) => Self::Task(n.clone()),
128            Self::Condition(n) => Self::Condition(n.clone()),
129            Self::Barrier(n) => Self::Barrier(n.clone()),
130            Self::Parallel(n) => Self::Parallel(n.clone()),
131            Self::External(n) => Self::External(n.clone()),
132            Self::ExternalLeaf(n) => Self::ExternalLeaf(n.clone()),
133            Self::Subgraph(n) => Self::Subgraph(n.clone()),
134        }
135    }
136}
137
138// ─── TaskNode ────────────────────────────────────────────────
139
140/// Task 节点回调类型别名(向后兼容 NodeContext)。
141pub type TaskFn<S> = Arc<dyn Fn(&mut NodeContext<'_, S>) -> Result<(), GraphError> + Send + Sync>;
142
143/// 自定义逻辑节点。
144#[derive(Clone)]
145pub struct TaskNode<S: WorkflowState = State> {
146    pub name: String,
147    pub func: TaskFn<S>,
148}
149
150impl<S: WorkflowState> TaskNode<S> {
151    pub fn new(
152        name: impl Into<String>,
153        func: impl Fn(&mut NodeContext<'_, S>) -> Result<(), GraphError> + Send + Sync + 'static,
154    ) -> Self {
155        Self {
156            name: name.into(),
157            func: Arc::new(func),
158        }
159    }
160}
161
162/// TaskNode 实现 FlowNode(向后兼容 — 使用 NodeContext)。
163///
164/// 未来将迁移到 LeafNode + LeafContext。
165#[async_trait]
166impl<S: WorkflowState> FlowNode<S> for TaskNode<S> {
167    async fn execute(&self, ctx: &mut NodeContext<'_, S>) -> Result<(), GraphError> {
168        (self.func)(ctx)
169    }
170}
171
172// ─── ConditionNode ───────────────────────────────────────────
173
174/// 条件分支回调类型别名。
175pub type BranchCondition<S> = Arc<dyn Fn(&S) -> bool + Send + Sync>;
176
177/// 条件分支节点。
178#[derive(Clone)]
179pub struct ConditionNode<S: WorkflowState = State> {
180    pub name: String,
181    pub branches: Vec<(String, BranchCondition<S>)>,
182}
183
184impl<S: WorkflowState> ConditionNode<S> {
185    pub fn builder(name: impl Into<String>) -> ConditionNodeBuilder<S> {
186        ConditionNodeBuilder {
187            name: name.into(),
188            branches: Vec::new(),
189        }
190    }
191}
192
193/// ConditionNode 构建器。
194pub struct ConditionNodeBuilder<S: WorkflowState = State> {
195    name: String,
196    branches: Vec<(String, BranchCondition<S>)>,
197}
198
199impl<S: WorkflowState> ConditionNodeBuilder<S> {
200    pub fn branch(
201        mut self,
202        target: impl Into<String>,
203        condition: impl Fn(&S) -> bool + Send + Sync + 'static,
204    ) -> Self {
205        self.branches.push((target.into(), Arc::new(condition)));
206        self
207    }
208
209    pub fn build(self) -> ConditionNode<S> {
210        ConditionNode {
211            name: self.name,
212            branches: self.branches,
213        }
214    }
215}
216
217/// ConditionNode 实现 LeafNode(推荐路径 — 只读 state + goto)。
218#[async_trait]
219impl<S: WorkflowState> LeafNode<S> for ConditionNode<S> {
220    async fn execute(&self, ctx: &mut LeafContext<'_, S>) -> Result<(), GraphError> {
221        let state = ctx.state();
222        for (target, condition) in &self.branches {
223            if condition(state) {
224                ctx.goto(target);
225                return Ok(());
226            }
227        }
228        Ok(())
229    }
230}
231
232/// ConditionNode 实现 FlowNode(向后兼容 — 使用 NodeContext)。
233#[async_trait]
234impl<S: WorkflowState> FlowNode<S> for ConditionNode<S> {
235    async fn execute(&self, ctx: &mut NodeContext<'_, S>) -> Result<(), GraphError> {
236        let state = ctx.state();
237        for (target, condition) in &self.branches {
238            if condition(state) {
239                ctx.goto(target);
240                return Ok(());
241            }
242        }
243        Ok(())
244    }
245}
246
247// ─── Backward Compatibility Alias ─────────────────────────────
248
249/// 向后兼容别名 — `GraphNode` → `FlowNode`。
250pub type GraphNode<S> = dyn FlowNode<S>;