lellm_graph/workflow_state.rs
1//! WorkflowState + Mutation + MergeStrategy — Typed State 框架。
2//!
3//! v0.4+ 终局:砸碎 `HashMap<String, Value>`,引入编译期类型安全。
4//!
5//! 核心原则:
6//! - 状态是强类型 struct,不是动态 HashMap
7//! - 状态变更通过 Mutation(确定性命令),不是节点直接写
8//! - Mutation 自己知道如何修改 State(CQRS / Event Sourcing 职责划分)
9//! - 并行合并规则由 Graph 层的 MergeStrategy 决定,不是 State 内建属性
10//! - Checkpoint 采用 Snapshot 实现快速恢复;Mutation Log 用于审计、调试和可选的
11//! 确定性重放,两者共同构成完整的恢复与追踪体系
12//!
13//! Graph 层提供 trait 框架,各业务层(agent/mcp/...)定义自己的 State + Mutation。
14
15use std::fmt::Debug;
16
17// ─── StateMutation ──────────────────────────────────────────────
18
19/// 状态变更命令 — 描述一次对 State 的确定性修改。
20///
21/// Mutation 自己知道如何修改对应的 State(`apply(self, &mut S)`)。
22/// State 只是数据,Mutation 是变更逻辑,Executor 负责调度。
23///
24/// # 设计原则
25///
26/// - **Command 而非 Patch**:`AppendMessage` 而非 `SetMessages`
27/// - **Enum 分发**:顶层 enum 只做一层 match,具体逻辑在各 variant 的 `apply()` 中
28/// - **无 Serialize 强制**:只有需要 Replay 的运行时才加 `Serialize` bound
29pub trait StateMutation<S>: Sized + Send + Sync + Debug {
30 /// 将此 Mutation 应用到目标 State。
31 ///
32 /// 这是 Mutation 的核心职责:每个 variant 独立实现,
33 /// 顶层 enum 只做一层 dispatch。
34 fn apply(self, state: &mut S);
35
36 /// 将此 Mutation 合并到另一个同类型 Mutation 中(可选)。
37 ///
38 /// 用于批量场景:多个 Mutation 合并为一个,减少 apply 次数。
39 /// 默认返回 `None` 表示不可合并。
40 fn combine(self, _other: Self) -> Option<Self> {
41 None
42 }
43}
44
45// ─── WorkflowState ──────────────────────────────────────────────
46
47/// 工作流状态 — 编译期类型安全的状态容器。
48///
49/// 替代 `HashMap<String, Value>` 动态模型。
50/// 每个工作流定义自己的 State struct 和 Mutation enum,
51/// 实现此 trait 以声明关联类型。
52///
53/// **State 只是数据。** 状态变更逻辑在 [`StateMutation`] trait 中。
54/// **Merge 职责已从 `WorkflowState` 剥离到 [`MergeStrategy`]。**
55///
56/// # 示例
57///
58/// ```rust,ignore
59/// // State 只是数据
60/// pub struct AgentState {
61/// pub messages: Vec<Message>,
62/// pub iterations: usize,
63/// pub output_tokens: usize,
64/// }
65///
66/// // Mutation 自己知道怎么改 State
67/// pub enum AgentMutation {
68/// AppendMessage(Message),
69/// IncrementIteration,
70/// RecordOutputTokens(usize),
71/// }
72///
73/// impl StateMutation<AgentState> for AgentMutation {
74/// fn apply(self, state: &mut AgentState) {
75/// match self {
76/// AgentMutation::AppendMessage(msg) => state.messages.push(msg),
77/// AgentMutation::IncrementIteration => state.iterations += 1,
78/// AgentMutation::RecordOutputTokens(n) => state.output_tokens += n,
79/// }
80/// }
81/// }
82///
83/// // WorkflowState 只声明关联类型 — 没有 apply()
84/// impl WorkflowState for AgentState {
85/// type Mutation = AgentMutation;
86/// }
87/// ```
88pub trait WorkflowState:
89 Clone + Send + Sync + serde::Serialize + serde::de::DeserializeOwned
90{
91 /// 与此状态关联的 Mutation 类型。
92 type Mutation: StateMutation<Self>;
93
94 /// 批量应用 Mutation — 唯一公开入口。
95 ///
96 /// 默认实现:逐个调用 [`StateMutation::apply`]。
97 /// 未来可覆盖为 Transaction 语义(begin → validate → apply → commit/rollback)。
98 fn apply_batch(&mut self, mutations: impl IntoIterator<Item = Self::Mutation>) {
99 for mutation in mutations {
100 mutation.apply(self);
101 }
102 }
103
104 /// 创建默认/初始状态。
105 fn initial() -> Self
106 where
107 Self: Default,
108 {
109 Self::default()
110 }
111}
112
113// ─── MergeStrategy ──────────────────────────────────────────────
114
115/// 并行分支合并策略 — Graph 层职责,非 State 内建属性。
116///
117/// 将多个并行分支执行后产生的状态合并为一个。
118/// 合并规则由 Graph 编排层决定,而非 State 自身。
119///
120/// # 职责边界
121///
122/// - **State** = 数据
123/// - **MergeStrategy** = 并行语义
124/// - **ExecutionEngine** = 调度 + commit
125/// - **Node** = Mutation Producer
126///
127/// # 示例
128///
129/// ```rust,ignore
130/// // 为 AgentState 定义合并策略
131/// pub struct AgentStateMerge;
132/// impl MergeStrategy<AgentState> for AgentStateMerge {
133/// fn merge(branches: Vec<AgentState>) -> Result<AgentState, WorkflowError> {
134/// // messages: concat, iterations: max, tokens: sum
135/// }
136/// }
137///
138/// // ParallelNode 使用
139/// ParallelNode::builder()
140/// .merge_strategy(AgentStateMerge)
141/// .branch("search", search_node)
142/// .branch("analyze", analyze_node)
143/// .build();
144/// ```
145pub trait MergeStrategy<S>: Send + Sync {
146 /// 合并多个并行分支的状态。
147 ///
148 /// `branches` 按注册顺序排列(与 ParallelNode 的 branch 注册顺序一致)。
149 fn merge(branches: Vec<S>) -> Result<S, WorkflowError>;
150
151 /// 创建策略的默认实例(供 ParallelNodeBuilder 使用)。
152 /// 对于无状态策略(如 StateMerge、LastWriteWins),直接返回自身。
153 fn default_instance() -> Self;
154}
155
156/// 默认合并策略 — 最后一个分支获胜。
157///
158/// 适用于大多数场景:各分支从同一 base 出发,
159/// 最后一个分支的写入覆盖前面的。
160pub struct LastWriteWins;
161
162impl<S> MergeStrategy<S> for LastWriteWins {
163 fn merge(branches: Vec<S>) -> Result<S, WorkflowError> {
164 branches
165 .into_iter()
166 .last()
167 .ok_or_else(|| WorkflowError::MergeConflict("no branches to merge".into()))
168 }
169
170 fn default_instance() -> Self {
171 LastWriteWins
172 }
173}
174
175// ─── WorkflowError ──────────────────────────────────────────────
176
177/// 工作流状态操作错误。
178#[derive(Debug, thiserror::Error)]
179pub enum WorkflowError {
180 /// 状态合并冲突
181 #[error("state merge conflict: {0}")]
182 MergeConflict(String),
183
184 /// Mutation 应用失败
185 #[error("failed to apply mutation: {0}")]
186 ApplyFailed(String),
187
188 /// 状态序列化/反序列化失败
189 #[error("state serialization error: {0}")]
190 Serialization(#[from] serde_json::Error),
191}