lellm_graph/state/workflow_state.rs
1//! WorkflowState + Mutation + MergeStrategy — Typed State 框架。
2//!
3//! v0.4+ 终局:砸碎 `HashMap<String, Value>`,引入编译期类型安全。
4//!
5//! 核心原则:
6//! - 状态是强类型 struct,不是动态 HashMap
7//! - 状态变更通过 Mutation(确定性命令),不是节点直接写
8//! - Mutation 自己知道如何修改 State(CQRS / Event Sourcing 职责划分)
9//! - 并行合并规则由 Graph 层的 MergeStrategy 决定,不是 State 内建属性
10//! - Checkpoint 采用 Snapshot 实现快速恢复;Mutation Log 用于审计、调试和可选的
11//! 确定性重放,两者共同构成完整的恢复与追踪体系
12//!
13//! Graph 层提供 trait 框架,各业务层(agent/mcp/...)定义自己的 State + Mutation。
14
15use std::fmt::Debug;
16
17// ─── StateMutation ──────────────────────────────────────────────
18
19/// 状态变更命令 — 描述一次对 State 的确定性修改。
20///
21/// Mutation 自己知道如何修改对应的 State(`apply(self, &mut S)`)。
22/// State 只是数据,Mutation 是变更逻辑,Executor 负责调度。
23///
24/// # 设计原则
25///
26/// - **Command 而非 Patch**:`AppendMessage` 而非 `SetMessages`
27/// - **Enum 分发**:顶层 enum 只做一层 match,具体逻辑在各 variant 的 `apply()` 中
28/// - **无 Serialize 强制**:只有需要 Replay 的运行时才加 `Serialize` bound
29pub trait StateMutation<S>: Sized + Send + Sync + Debug {
30 /// 将此 Mutation 应用到目标 State。
31 ///
32 /// 这是 Mutation 的核心职责:每个 variant 独立实现,
33 /// 顶层 enum 只做一层 dispatch。
34 fn apply(self, state: &mut S);
35
36 /// 将此 Mutation 合并到另一个同类型 Mutation 中(可选)。
37 ///
38 /// 用于批量场景:多个 Mutation 合并为一个,减少 apply 次数。
39 /// 默认返回 `None` 表示不可合并。
40 fn combine(self, _other: Self) -> Option<Self> {
41 None
42 }
43}
44
45// ─── WorkflowState ──────────────────────────────────────────────
46
47/// 工作流状态 — 编译期类型安全的状态容器。
48///
49/// 替代 `HashMap<String, Value>` 动态模型。
50/// 每个工作流定义自己的 State struct 和 Mutation enum,
51/// 实现此 trait 以声明关联类型。
52///
53/// **State 只是数据。** 状态变更逻辑在 [`StateMutation`] trait 中。
54/// **Merge 职责已从 `WorkflowState` 剥离到 [`MergeStrategy`]。**
55/// **Checkpoint 采用 Projection 模式** — Runtime State 可包含不可序列化字段
56/// (如 `Arc<dyn ...>`, `Sender`, `Cache`),Checkpoint 只序列化必要字段。
57///
58/// # 示例
59///
60/// ```rust,ignore
61/// // State 只是数据
62/// pub struct AgentState {
63/// pub messages: Vec<Message>,
64/// pub iterations: usize,
65/// pub output_tokens: usize,
66/// pub cache: Arc<dyn ToolCatalog>, // 不可序列化
67/// }
68///
69/// // 可序列化的 Checkpoint 投影
70/// #[derive(Serialize, Deserialize)]
71/// pub struct AgentCheckpoint {
72/// pub messages: Vec<Message>,
73/// pub iterations: usize,
74/// pub output_tokens: usize,
75/// // 不包含 cache
76/// }
77///
78/// // Mutation 自己知道怎么改 State
79/// pub enum AgentMutation {
80/// AppendMessage(Message),
81/// IncrementIteration,
82/// RecordOutputTokens(usize),
83/// }
84///
85/// impl StateMutation<AgentState> for AgentMutation {
86/// fn apply(self, state: &mut AgentState) {
87/// match self {
88/// AgentMutation::AppendMessage(msg) => state.messages.push(msg),
89/// AgentMutation::IncrementIteration => state.iterations += 1,
90/// AgentMutation::RecordOutputTokens(n) => state.output_tokens += n,
91/// }
92/// }
93/// }
94///
95/// // WorkflowState 声明 Checkpoint 和 Mutation 关联类型
96/// impl WorkflowState for AgentState {
97/// type Checkpoint = AgentCheckpoint;
98/// type Mutation = AgentMutation;
99///
100/// fn snapshot(&self) -> AgentCheckpoint {
101/// AgentCheckpoint {
102/// messages: self.messages.clone(),
103/// iterations: self.iterations,
104/// output_tokens: self.output_tokens,
105/// }
106/// }
107///
108/// fn restore(checkpoint: AgentCheckpoint) -> Self {
109/// AgentState {
110/// messages: checkpoint.messages,
111/// iterations: checkpoint.iterations,
112/// output_tokens: checkpoint.output_tokens,
113/// cache: Arc::new(ToolCatalog::default()), // 重建
114/// }
115/// }
116/// }
117/// ```
118pub trait WorkflowState: Clone + Send + Sync {
119 /// 可序列化的 Checkpoint 快照(projection,不是 raw state)。
120 ///
121 /// Runtime State 可以包含不可序列化字段(`Arc<dyn ...>`, `Sender`, `Cache`),
122 /// Checkpoint 只序列化必要字段。这是强制的 Projection 模式。
123 type Checkpoint: serde::Serialize + serde::de::DeserializeOwned + Clone + Send;
124
125 /// 与此状态关联的 Mutation 类型。
126 type Mutation: StateMutation<Self>;
127
128 /// 创建 checkpoint 快照 — 只序列化必要字段。
129 ///
130 /// 这是 Projection 的核心:开发者必须决定哪些字段需要持久化。
131 /// 编译期保证 `Checkpoint` 可序列化。
132 fn snapshot(&self) -> Self::Checkpoint;
133
134 /// 从 checkpoint 恢复运行时状态。
135 ///
136 /// 恢复时明确哪些字段从 checkpoint 加载,哪些需要重建。
137 fn restore(checkpoint: Self::Checkpoint) -> Self;
138
139 /// 批量应用 Mutation — 唯一公开入口。
140 ///
141 /// 默认实现:逐个调用 [`StateMutation::apply`]。
142 /// 未来可覆盖为 Transaction 语义(begin → validate → apply → commit/rollback)。
143 fn apply_batch(&mut self, mutations: impl IntoIterator<Item = Self::Mutation>) {
144 for mutation in mutations {
145 mutation.apply(self);
146 }
147 }
148
149 /// 创建默认/初始状态。
150 fn initial() -> Self
151 where
152 Self: Default,
153 {
154 Self::default()
155 }
156}
157
158// ─── MergeStrategy ──────────────────────────────────────────────
159
160/// 并行分支合并策略 — Graph 层职责,非 State 内建属性。
161///
162/// 将多个并行分支执行后产生的状态合并为一个。
163/// 合并规则由 Graph 编排层决定,而非 State 自身。
164///
165/// # 职责边界
166///
167/// - **State** = 数据
168/// - **MergeStrategy** = 并行语义
169/// - **ExecutionEngine** = 调度 + commit
170/// - **Node** = Mutation Producer
171///
172/// # 示例
173///
174/// ```rust,ignore
175/// // 为 AgentState 定义合并策略
176/// pub struct AgentStateMerge;
177/// impl MergeStrategy<AgentState> for AgentStateMerge {
178/// fn merge(branches: Vec<AgentState>) -> Result<AgentState, WorkflowError> {
179/// // messages: concat, iterations: max, tokens: sum
180/// }
181/// }
182///
183/// // ParallelNode 使用
184/// ParallelNode::builder()
185/// .merge_strategy(AgentStateMerge)
186/// .branch("search", search_node)
187/// .branch("analyze", analyze_node)
188/// .build();
189/// ```
190pub trait MergeStrategy<S>: Send + Sync {
191 /// 合并多个并行分支的状态。
192 ///
193 /// `branches` 按注册顺序排列(与 ParallelNode 的 branch 注册顺序一致)。
194 fn merge(branches: Vec<S>) -> Result<S, WorkflowError>;
195
196 /// 创建策略的默认实例(供 ParallelNodeBuilder 使用)。
197 /// 对于无状态策略(如 StateMerge、LastWriteWins),直接返回自身。
198 fn default_instance() -> Self;
199}
200
201/// 默认合并策略 — 最后一个分支获胜。
202///
203/// 适用于大多数场景:各分支从同一 base 出发,
204/// 最后一个分支的写入覆盖前面的。
205pub struct LastWriteWins;
206
207impl<S> MergeStrategy<S> for LastWriteWins {
208 fn merge(branches: Vec<S>) -> Result<S, WorkflowError> {
209 branches
210 .into_iter()
211 .last()
212 .ok_or_else(|| WorkflowError::MergeConflict("no branches to merge".into()))
213 }
214
215 fn default_instance() -> Self {
216 LastWriteWins
217 }
218}
219
220// ─── WorkflowError ──────────────────────────────────────────────
221
222/// 工作流状态操作错误。
223#[derive(Debug, thiserror::Error)]
224pub enum WorkflowError {
225 /// 状态合并冲突
226 #[error("state merge conflict: {0}")]
227 MergeConflict(String),
228
229 /// Mutation 应用失败
230 #[error("failed to apply mutation: {0}")]
231 ApplyFailed(String),
232
233 /// 状态序列化/反序列化失败
234 #[error("state serialization error: {0}")]
235 Serialization(#[from] serde_json::Error),
236}