Skip to main content

lellm_graph/state/
workflow_state.rs

1//! WorkflowState + Mutation + MergeStrategy — Typed State 框架。
2//!
3//! v0.4+ 终局:砸碎 `HashMap<String, Value>`,引入编译期类型安全。
4//!
5//! 核心原则:
6//! - 状态是强类型 struct,不是动态 HashMap
7//! - 状态变更通过 Mutation(确定性命令),不是节点直接写
8//! - Mutation 自己知道如何修改 State(CQRS / Event Sourcing 职责划分)
9//! - 并行合并规则由 Graph 层的 MergeStrategy 决定,不是 State 内建属性
10//! - Checkpoint 采用 Snapshot 实现快速恢复;Mutation Log 用于审计、调试和可选的
11//!   确定性重放,两者共同构成完整的恢复与追踪体系
12//!
13//! Graph 层提供 trait 框架,各业务层(agent/mcp/...)定义自己的 State + Mutation。
14
15use std::fmt::Debug;
16
17// ─── StateMutation ──────────────────────────────────────────────
18
19/// 状态变更命令 — 描述一次对 State 的确定性修改。
20///
21/// Mutation 自己知道如何修改对应的 State(`apply(self, &mut S)`)。
22/// State 只是数据,Mutation 是变更逻辑,Executor 负责调度。
23///
24/// # 设计原则
25///
26/// - **Command 而非 Patch**:`AppendMessage` 而非 `SetMessages`
27/// - **Enum 分发**:顶层 enum 只做一层 match,具体逻辑在各 variant 的 `apply()` 中
28/// - **无 Serialize 强制**:只有需要 Replay 的运行时才加 `Serialize` bound
29pub trait StateMutation<S>: Sized + Send + Sync + Debug {
30    /// 将此 Mutation 应用到目标 State。
31    ///
32    /// 这是 Mutation 的核心职责:每个 variant 独立实现,
33    /// 顶层 enum 只做一层 dispatch。
34    fn apply(self, state: &mut S);
35
36    /// 将此 Mutation 合并到另一个同类型 Mutation 中(可选)。
37    ///
38    /// 用于批量场景:多个 Mutation 合并为一个,减少 apply 次数。
39    /// 默认返回 `None` 表示不可合并。
40    fn combine(self, _other: Self) -> Option<Self> {
41        None
42    }
43}
44
45// ─── WorkflowState ──────────────────────────────────────────────
46
47/// 工作流状态 — 编译期类型安全的状态容器。
48///
49/// 替代 `HashMap<String, Value>` 动态模型。
50/// 每个工作流定义自己的 State struct 和 Mutation enum,
51/// 实现此 trait 以声明关联类型。
52///
53/// **State 只是数据。** 状态变更逻辑在 [`StateMutation`] trait 中。
54/// **Merge 职责已从 `WorkflowState` 剥离到 [`MergeStrategy`]。**
55/// **Checkpoint 采用 Projection 模式** — Runtime State 可包含不可序列化字段
56/// (如 `Arc<dyn ...>`, `Sender`, `Cache`),Checkpoint 只序列化必要字段。
57///
58/// # 示例
59///
60/// ```rust,ignore
61/// // State 只是数据
62/// pub struct AgentState {
63///     pub messages: Vec<Message>,
64///     pub iterations: usize,
65///     pub output_tokens: usize,
66///     pub cache: Arc<dyn ToolCatalog>,  // 不可序列化
67/// }
68///
69/// // 可序列化的 Checkpoint 投影
70/// #[derive(Serialize, Deserialize)]
71/// pub struct AgentCheckpoint {
72///     pub messages: Vec<Message>,
73///     pub iterations: usize,
74///     pub output_tokens: usize,
75///     // 不包含 cache
76/// }
77///
78/// // Mutation 自己知道怎么改 State
79/// pub enum AgentMutation {
80///     AppendMessage(Message),
81///     IncrementIteration,
82///     RecordOutputTokens(usize),
83/// }
84///
85/// impl StateMutation<AgentState> for AgentMutation {
86///     fn apply(self, state: &mut AgentState) {
87///         match self {
88///             AgentMutation::AppendMessage(msg) => state.messages.push(msg),
89///             AgentMutation::IncrementIteration => state.iterations += 1,
90///             AgentMutation::RecordOutputTokens(n) => state.output_tokens += n,
91///         }
92///     }
93/// }
94///
95/// // WorkflowState 声明 Checkpoint 和 Mutation 关联类型
96/// impl WorkflowState for AgentState {
97///     type Checkpoint = AgentCheckpoint;
98///     type Mutation = AgentMutation;
99///
100///     fn snapshot(&self) -> AgentCheckpoint {
101///         AgentCheckpoint {
102///             messages: self.messages.clone(),
103///             iterations: self.iterations,
104///             output_tokens: self.output_tokens,
105///         }
106///     }
107///
108///     fn restore(checkpoint: AgentCheckpoint) -> Self {
109///         AgentState {
110///             messages: checkpoint.messages,
111///             iterations: checkpoint.iterations,
112///             output_tokens: checkpoint.output_tokens,
113///             cache: Arc::new(ToolCatalog::default()),  // 重建
114///         }
115///     }
116/// }
117/// ```
118pub trait WorkflowState: Clone + Send + Sync {
119    /// 可序列化的 Checkpoint 快照(projection,不是 raw state)。
120    ///
121    /// Runtime State 可以包含不可序列化字段(`Arc<dyn ...>`, `Sender`, `Cache`),
122    /// Checkpoint 只序列化必要字段。这是强制的 Projection 模式。
123    type Checkpoint: serde::Serialize + serde::de::DeserializeOwned + Clone + Send;
124
125    /// 与此状态关联的 Mutation 类型。
126    type Mutation: StateMutation<Self>;
127
128    /// 创建 checkpoint 快照 — 只序列化必要字段。
129    ///
130    /// 这是 Projection 的核心:开发者必须决定哪些字段需要持久化。
131    /// 编译期保证 `Checkpoint` 可序列化。
132    fn snapshot(&self) -> Self::Checkpoint;
133
134    /// 从 checkpoint 恢复运行时状态。
135    ///
136    /// 恢复时明确哪些字段从 checkpoint 加载,哪些需要重建。
137    fn restore(checkpoint: Self::Checkpoint) -> Self;
138
139    /// 批量应用 Mutation — 唯一公开入口。
140    ///
141    /// 默认实现:逐个调用 [`StateMutation::apply`]。
142    /// 未来可覆盖为 Transaction 语义(begin → validate → apply → commit/rollback)。
143    fn apply_batch(&mut self, mutations: impl IntoIterator<Item = Self::Mutation>) {
144        for mutation in mutations {
145            mutation.apply(self);
146        }
147    }
148
149    /// 创建默认/初始状态。
150    fn initial() -> Self
151    where
152        Self: Default,
153    {
154        Self::default()
155    }
156}
157
158// ─── MergeStrategy ──────────────────────────────────────────────
159
160/// 并行分支合并策略 — Graph 层职责,非 State 内建属性。
161///
162/// 将多个并行分支执行后产生的状态合并为一个。
163/// 合并规则由 Graph 编排层决定,而非 State 自身。
164///
165/// # 职责边界
166///
167/// - **State** = 数据
168/// - **MergeStrategy** = 并行语义
169/// - **ExecutionEngine** = 调度 + commit
170/// - **Node** = Mutation Producer
171///
172/// # 示例
173///
174/// ```rust,ignore
175/// // 为 AgentState 定义合并策略
176/// pub struct AgentStateMerge;
177/// impl MergeStrategy<AgentState> for AgentStateMerge {
178///     fn merge(branches: Vec<AgentState>) -> Result<AgentState, WorkflowError> {
179///         // messages: concat, iterations: max, tokens: sum
180///     }
181/// }
182///
183/// // ParallelNode 使用
184/// ParallelNode::builder()
185///     .merge_strategy(AgentStateMerge)
186///     .branch("search", search_node)
187///     .branch("analyze", analyze_node)
188///     .build();
189/// ```
190pub trait MergeStrategy<S>: Send + Sync {
191    /// 合并多个并行分支的状态。
192    ///
193    /// `branches` 按注册顺序排列(与 ParallelNode 的 branch 注册顺序一致)。
194    fn merge(branches: Vec<S>) -> Result<S, WorkflowError>;
195
196    /// 创建策略的默认实例(供 ParallelNodeBuilder 使用)。
197    /// 对于无状态策略(如 StateMerge、LastWriteWins),直接返回自身。
198    fn default_instance() -> Self;
199}
200
201/// 默认合并策略 — 最后一个分支获胜。
202///
203/// 适用于大多数场景:各分支从同一 base 出发,
204/// 最后一个分支的写入覆盖前面的。
205pub struct LastWriteWins;
206
207impl<S> MergeStrategy<S> for LastWriteWins {
208    fn merge(branches: Vec<S>) -> Result<S, WorkflowError> {
209        branches
210            .into_iter()
211            .last()
212            .ok_or_else(|| WorkflowError::MergeConflict("no branches to merge".into()))
213    }
214
215    fn default_instance() -> Self {
216        LastWriteWins
217    }
218}
219
220// ─── WorkflowError ──────────────────────────────────────────────
221
222/// 工作流状态操作错误。
223#[derive(Debug, thiserror::Error)]
224pub enum WorkflowError {
225    /// 状态合并冲突
226    #[error("state merge conflict: {0}")]
227    MergeConflict(String),
228
229    /// Mutation 应用失败
230    #[error("failed to apply mutation: {0}")]
231    ApplyFailed(String),
232
233    /// 状态序列化/反序列化失败
234    #[error("state serialization error: {0}")]
235    Serialization(#[from] serde_json::Error),
236}