lellm_graph/workflow_state.rs
1//! WorkflowState + Effect + MergeStrategy — Typed State 框架。
2//!
3//! v0.4+ 终局:砸碎 `HashMap<String, Value>`,引入编译期类型安全。
4//!
5//! 核心原则:
6//! - 状态是强类型 struct,不是动态 HashMap
7//! - 状态变更通过 Effect(领域事件),不是节点直接写
8//! - 并行合并规则由 Graph 层的 MergeStrategy 决定,不是 State 内建属性
9//! - Checkpoint = Effect Log,支持确定性重放
10//!
11//! Graph 层提供 trait 框架,各业务层(agent/mcp/...)定义自己的 State + Effect。
12
13// ─── Effect ─────────────────────────────────────────────────────
14
15/// 效果 — 描述一次状态转换的领域事件。
16///
17/// Effect 是不可变的、可序列化的、自包含的。
18/// 状态通过 `apply(effect)` 变更,而非直接修改。
19pub trait Effect: Sized + Send + Sync + serde::Serialize + serde::de::DeserializeOwned {
20 /// 将此 Effect 合并到另一个同类型 Effect 中(可选)。
21 ///
22 /// 用于批量场景:多个 Effect 合并为一个,减少 apply 次数。
23 /// 默认返回 `None` 表示不可合并。
24 fn combine(self, _other: Self) -> Option<Self> {
25 None
26 }
27}
28
29// ─── WorkflowState ──────────────────────────────────────────────
30
31/// 工作流状态 — 编译期类型安全的状态容器。
32///
33/// 替代 `HashMap<String, Value>` 动态模型。
34/// 每个工作流定义自己的 State struct 和 Effect enum,
35/// 实现此 trait 以声明状态转换规则。
36///
37/// **Merge 职责已从 `WorkflowState` 剥离到 [`MergeStrategy`]。**
38/// 并行合并是 Graph 层的执行语义,不是 State 层的内建属性。
39///
40/// # 示例
41///
42/// ```rust,ignore
43/// pub enum AgentEffect {
44/// AppendMessage(Message),
45/// IncrementIteration,
46/// RecordOutputTokens(usize),
47/// }
48///
49/// pub struct AgentState {
50/// pub messages: Vec<Message>,
51/// pub iterations: usize,
52/// pub output_tokens: usize,
53/// }
54///
55/// impl WorkflowState for AgentState {
56/// type Effect = AgentEffect;
57///
58/// fn apply(&mut self, effect: Self::Effect) {
59/// match effect {
60/// AgentEffect::AppendMessage(msg) => self.messages.push(msg),
61/// AgentEffect::IncrementIteration => self.iterations += 1,
62/// AgentEffect::RecordOutputTokens(n) => self.output_tokens += n,
63/// }
64/// }
65/// }
66/// ```
67pub trait WorkflowState:
68 Clone + Send + Sync + serde::Serialize + serde::de::DeserializeOwned
69{
70 /// 与此状态关联的 Effect 类型。
71 type Effect: Effect;
72
73 /// 应用一个 Effect 到状态。
74 fn apply(&mut self, effect: Self::Effect);
75
76 /// 批量应用 Effect(默认逐个 apply)。
77 fn apply_batch(&mut self, effects: impl IntoIterator<Item = Self::Effect>) {
78 for effect in effects {
79 self.apply(effect);
80 }
81 }
82
83 /// 应用一个 BranchState 变更记录到状态(backward compat)。
84 ///
85 /// 默认实现:no-op(纯 Effect 驱动的状态不需要此方法)。
86 /// `State`(HashMap wrapper)覆盖此方法,将 ChangeRecord 转换为 StateEffect。
87 fn apply_branch_change(&mut self, _change: &crate::branch_state::ChangeRecord) {
88 // no-op — pure effect-driven states don't use BranchState changes
89 }
90
91 /// 创建默认/初始状态。
92 fn initial() -> Self
93 where
94 Self: Default,
95 {
96 Self::default()
97 }
98}
99
100// ─── MergeStrategy ──────────────────────────────────────────────
101
102/// 并行分支合并策略 — Graph 层职责,非 State 内建属性。
103///
104/// 将多个并行分支执行后产生的状态合并为一个。
105/// 合并规则由 Graph 编排层决定,而非 State 自身。
106///
107/// # 职责边界
108///
109/// - **State** = 数据
110/// - **BranchState** = Overlay
111/// - **ChangeLog** = Observability + Checkpoint
112/// - **MergeStrategy** = 并行语义
113/// - **Executor** = 调度
114/// - **Node** = Effect Producer
115///
116/// # 示例
117///
118/// ```rust,ignore
119/// // 为 AgentState 定义合并策略
120/// pub struct AgentStateMerge;
121/// impl MergeStrategy<AgentState> for AgentStateMerge {
122/// fn merge(branches: Vec<AgentState>) -> Result<AgentState, WorkflowError> {
123/// // messages: concat, iterations: max, tokens: sum
124/// }
125/// }
126///
127/// // ParallelNode 使用
128/// ParallelNode::builder()
129/// .merge_strategy(AgentStateMerge)
130/// .branch("search", search_node)
131/// .branch("analyze", analyze_node)
132/// .build();
133/// ```
134pub trait MergeStrategy<S>: Send + Sync {
135 /// 合并多个并行分支的状态。
136 ///
137 /// `branches` 按注册顺序排列(与 ParallelNode 的 branch 注册顺序一致)。
138 fn merge(branches: Vec<S>) -> Result<S, WorkflowError>;
139
140 /// 创建策略的默认实例(供 ParallelNodeBuilder 使用)。
141 /// 对于无状态策略(如 StateMerge、LastWriteWins),直接返回自身。
142 fn default_instance() -> Self;
143}
144
145/// 默认合并策略 — 最后一个分支获胜。
146///
147/// 适用于大多数场景:各分支从同一 base 出发,
148/// 最后一个分支的写入覆盖前面的。
149pub struct LastWriteWins;
150
151impl<S> MergeStrategy<S> for LastWriteWins {
152 fn merge(branches: Vec<S>) -> Result<S, WorkflowError> {
153 branches
154 .into_iter()
155 .last()
156 .ok_or_else(|| WorkflowError::MergeConflict("no branches to merge".into()))
157 }
158
159 fn default_instance() -> Self {
160 LastWriteWins
161 }
162}
163
164// ─── WorkflowError ──────────────────────────────────────────────
165
166/// 工作流状态操作错误。
167#[derive(Debug, thiserror::Error)]
168pub enum WorkflowError {
169 /// 状态合并冲突
170 #[error("state merge conflict: {0}")]
171 MergeConflict(String),
172
173 /// Effect 应用失败
174 #[error("failed to apply effect: {0}")]
175 ApplyFailed(String),
176
177 /// 状态序列化/反序列化失败
178 #[error("state serialization error: {0}")]
179 Serialization(#[from] serde_json::Error),
180}