Skip to main content

lellm_graph/
workflow_state.rs

1//! WorkflowState + Mutation + MergeStrategy — Typed State 框架。
2//!
3//! v0.4+ 终局:砸碎 `HashMap<String, Value>`,引入编译期类型安全。
4//!
5//! 核心原则:
6//! - 状态是强类型 struct,不是动态 HashMap
7//! - 状态变更通过 Mutation(确定性命令),不是节点直接写
8//! - Mutation 自己知道如何修改 State(CQRS / Event Sourcing 职责划分)
9//! - 并行合并规则由 Graph 层的 MergeStrategy 决定,不是 State 内建属性
10//! - Checkpoint 采用 Snapshot 实现快速恢复;Mutation Log 用于审计、调试和可选的
11//!   确定性重放,两者共同构成完整的恢复与追踪体系
12//!
13//! Graph 层提供 trait 框架,各业务层(agent/mcp/...)定义自己的 State + Mutation。
14
15use std::fmt::Debug;
16
17// ─── StateMutation ──────────────────────────────────────────────
18
19/// 状态变更命令 — 描述一次对 State 的确定性修改。
20///
21/// Mutation 自己知道如何修改对应的 State(`apply(self, &mut S)`)。
22/// State 只是数据,Mutation 是变更逻辑,Executor 负责调度。
23///
24/// # 设计原则
25///
26/// - **Command 而非 Patch**:`AppendMessage` 而非 `SetMessages`
27/// - **Enum 分发**:顶层 enum 只做一层 match,具体逻辑在各 variant 的 `apply()` 中
28/// - **无 Serialize 强制**:只有需要 Replay 的运行时才加 `Serialize` bound
29pub trait StateMutation<S>: Sized + Send + Sync + Debug {
30    /// 将此 Mutation 应用到目标 State。
31    ///
32    /// 这是 Mutation 的核心职责:每个 variant 独立实现,
33    /// 顶层 enum 只做一层 dispatch。
34    fn apply(self, state: &mut S);
35
36    /// 将此 Mutation 合并到另一个同类型 Mutation 中(可选)。
37    ///
38    /// 用于批量场景:多个 Mutation 合并为一个,减少 apply 次数。
39    /// 默认返回 `None` 表示不可合并。
40    fn combine(self, _other: Self) -> Option<Self> {
41        None
42    }
43}
44
45// ─── WorkflowState ──────────────────────────────────────────────
46
47/// 工作流状态 — 编译期类型安全的状态容器。
48///
49/// 替代 `HashMap<String, Value>` 动态模型。
50/// 每个工作流定义自己的 State struct 和 Mutation enum,
51/// 实现此 trait 以声明关联类型。
52///
53/// **State 只是数据。** 状态变更逻辑在 [`StateMutation`] trait 中。
54/// **Merge 职责已从 `WorkflowState` 剥离到 [`MergeStrategy`]。**
55///
56/// # 示例
57///
58/// ```rust,ignore
59/// // State 只是数据
60/// pub struct AgentState {
61///     pub messages: Vec<Message>,
62///     pub iterations: usize,
63///     pub output_tokens: usize,
64/// }
65///
66/// // Mutation 自己知道怎么改 State
67/// pub enum AgentMutation {
68///     AppendMessage(Message),
69///     IncrementIteration,
70///     RecordOutputTokens(usize),
71/// }
72///
73/// impl StateMutation<AgentState> for AgentMutation {
74///     fn apply(self, state: &mut AgentState) {
75///         match self {
76///             AgentMutation::AppendMessage(msg) => state.messages.push(msg),
77///             AgentMutation::IncrementIteration => state.iterations += 1,
78///             AgentMutation::RecordOutputTokens(n) => state.output_tokens += n,
79///         }
80///     }
81/// }
82///
83/// // WorkflowState 只声明关联类型 — 没有 apply()
84/// impl WorkflowState for AgentState {
85///     type Mutation = AgentMutation;
86/// }
87/// ```
88pub trait WorkflowState:
89    Clone + Send + Sync + serde::Serialize + serde::de::DeserializeOwned
90{
91    /// 与此状态关联的 Mutation 类型。
92    type Mutation: StateMutation<Self>;
93
94    /// 批量应用 Mutation — 唯一公开入口。
95    ///
96    /// 默认实现:逐个调用 [`StateMutation::apply`]。
97    /// 未来可覆盖为 Transaction 语义(begin → validate → apply → commit/rollback)。
98    fn apply_batch(&mut self, mutations: impl IntoIterator<Item = Self::Mutation>) {
99        for mutation in mutations {
100            mutation.apply(self);
101        }
102    }
103
104    /// 创建默认/初始状态。
105    fn initial() -> Self
106    where
107        Self: Default,
108    {
109        Self::default()
110    }
111}
112
113// ─── MergeStrategy ──────────────────────────────────────────────
114
115/// 并行分支合并策略 — Graph 层职责,非 State 内建属性。
116///
117/// 将多个并行分支执行后产生的状态合并为一个。
118/// 合并规则由 Graph 编排层决定,而非 State 自身。
119///
120/// # 职责边界
121///
122/// - **State** = 数据
123/// - **MergeStrategy** = 并行语义
124/// - **ExecutionEngine** = 调度 + commit
125/// - **Node** = Mutation Producer
126///
127/// # 示例
128///
129/// ```rust,ignore
130/// // 为 AgentState 定义合并策略
131/// pub struct AgentStateMerge;
132/// impl MergeStrategy<AgentState> for AgentStateMerge {
133///     fn merge(branches: Vec<AgentState>) -> Result<AgentState, WorkflowError> {
134///         // messages: concat, iterations: max, tokens: sum
135///     }
136/// }
137///
138/// // ParallelNode 使用
139/// ParallelNode::builder()
140///     .merge_strategy(AgentStateMerge)
141///     .branch("search", search_node)
142///     .branch("analyze", analyze_node)
143///     .build();
144/// ```
145pub trait MergeStrategy<S>: Send + Sync {
146    /// 合并多个并行分支的状态。
147    ///
148    /// `branches` 按注册顺序排列(与 ParallelNode 的 branch 注册顺序一致)。
149    fn merge(branches: Vec<S>) -> Result<S, WorkflowError>;
150
151    /// 创建策略的默认实例(供 ParallelNodeBuilder 使用)。
152    /// 对于无状态策略(如 StateMerge、LastWriteWins),直接返回自身。
153    fn default_instance() -> Self;
154}
155
156/// 默认合并策略 — 最后一个分支获胜。
157///
158/// 适用于大多数场景:各分支从同一 base 出发,
159/// 最后一个分支的写入覆盖前面的。
160pub struct LastWriteWins;
161
162impl<S> MergeStrategy<S> for LastWriteWins {
163    fn merge(branches: Vec<S>) -> Result<S, WorkflowError> {
164        branches
165            .into_iter()
166            .last()
167            .ok_or_else(|| WorkflowError::MergeConflict("no branches to merge".into()))
168    }
169
170    fn default_instance() -> Self {
171        LastWriteWins
172    }
173}
174
175// ─── WorkflowError ──────────────────────────────────────────────
176
177/// 工作流状态操作错误。
178#[derive(Debug, thiserror::Error)]
179pub enum WorkflowError {
180    /// 状态合并冲突
181    #[error("state merge conflict: {0}")]
182    MergeConflict(String),
183
184    /// Mutation 应用失败
185    #[error("failed to apply mutation: {0}")]
186    ApplyFailed(String),
187
188    /// 状态序列化/反序列化失败
189    #[error("state serialization error: {0}")]
190    Serialization(#[from] serde_json::Error),
191}