Skip to main content

lellm_graph/
checkpoint.rs

1//! Checkpoint — 执行恢复的唯一数据源。
2//!
3//! 分层架构:
4//! ```text
5//! Checkpoint<S>           ← Workflow 层,强类型,纯 Snapshot 模型
6//!        │
7//!        ▼ serialize/deserialize
8//! CheckpointCodec<S>      ← 序列化层,对象 ↔ 二进制表示
9//!        │
10//!        ▼
11//! CheckpointBlob           ← 跨 Codec 的统一载体
12//!        │
13//!        ▼ save/load
14//! BlobCheckpointStore      ← 存储层 SPI,bytes in / bytes out
15//!        │
16//!        ▼
17//! Memory / File / S3 / SQLite  ← 后端实现
18//! ```
19//!
20//! # Phase 6: Execution Frame Snapshot
21//!
22//! 核心洞察:checkpoint 不是保存 state,而是保存 execution position + state projection。
23//!
24//! ```text
25//! checkpoint 的边界单位是 Graph Execution Frame,不是 WorkflowState 或 Node。
26//!
27//! 正确模型:
28//!   Graph Execution = Frame Stack
29//!
30//! Frame = {
31//!     graph_id,
32//!     node_id,
33//!     state_snapshot,
34//!     cursor,
35//! }
36//!
37//! checkpoint = FrameStack snapshot
38//! ```
39
40use std::fmt::Debug;
41
42use serde::{Deserialize, Serialize};
43
44use crate::state::State;
45use crate::workflow_state::WorkflowState;
46
47// ─── CheckpointId ──────────────────────────────────────────────
48
49/// Checkpoint 唯一标识。
50#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
51pub struct CheckpointId(pub uuid::Uuid);
52
53impl std::fmt::Display for CheckpointId {
54    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
55        write!(f, "{}", self.0)
56    }
57}
58
59// ─── NodeId ────────────────────────────────────────────────────
60
61/// 节点标识。
62#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
63pub struct NodeId(pub String);
64
65impl std::fmt::Display for NodeId {
66    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
67        write!(f, "{}", self.0)
68    }
69}
70
71// ─── Checkpoint ────────────────────────────────────────────────
72
73/// 执行检查点 — 物化快照 + 执行游标。
74///
75/// Checkpoint 的唯一职责:恢复(Restore)。
76/// 给我一个 Checkpoint,我就能从 `current_node` 开始,用 `state` 继续执行。
77///
78/// # P0-1: Checkpoint Projection
79///
80/// `state` 字段使用 `S::Checkpoint`(关联类型),不是 `S`(Runtime State)。
81/// 这保证:
82/// - Runtime State 可以包含不可序列化字段(`Arc<dyn ...>`, `Sender`, `Cache`)
83/// - Checkpoint 只序列化必要字段
84/// - 编译期保证可序列化
85///
86/// # Graph Compatibility
87///
88/// `graph_hash` 记录创建 Checkpoint 时的图结构指纹。
89/// 恢复时必须校验:`graph_hash` 不匹配 → 拒绝恢复(不允许 silent mismatch)。
90#[derive(Debug, Clone, Serialize, Deserialize)]
91pub struct Checkpoint<S: WorkflowState = State> {
92    /// 唯一标识
93    pub checkpoint_id: CheckpointId,
94    /// 下一个要执行的节点
95    pub current_node: NodeId,
96    /// 物化状态快照(P0-1: 使用 Checkpoint 关联类型,不是 raw State)
97    pub state: S::Checkpoint,
98    /// 图结构指纹 — 恢复时校验兼容性
99    pub graph_hash: u64,
100    /// 创建时间
101    pub created_at: std::time::SystemTime,
102}
103
104impl<S: WorkflowState> Checkpoint<S> {
105    /// 从 Runtime State 创建 Checkpoint(使用 snapshot() 投影)。
106    pub fn new(current_node: impl Into<String>, state: &S, graph_hash: u64) -> Self {
107        Self {
108            checkpoint_id: CheckpointId(uuid::Uuid::new_v4()),
109            current_node: NodeId(current_node.into()),
110            state: state.snapshot(),
111            graph_hash,
112            created_at: std::time::SystemTime::now(),
113        }
114    }
115
116    /// 从 Checkpoint 恢复 Runtime State(使用 restore())。
117    pub fn restore_state(self) -> S {
118        S::restore(self.state)
119    }
120}
121
122// ─── CheckpointBlob ────────────────────────────────────────────
123
124/// 跨 Codec 的统一载体 — 存储层操作的对象。
125///
126/// 将序列化后的二进制数据与元数据打包,供 CheckpointStore 使用。
127/// 存储层无需知道 State 类型或序列化格式。
128///
129/// `graph_hash` 作为 correctness invariant 存储:
130/// 恢复时校验 `graph_hash` 不匹配 → reject,不允许 silent mismatch。
131#[derive(Debug, Clone)]
132pub struct CheckpointBlob {
133    /// Checkpoint 唯一标识
134    pub id: CheckpointId,
135    /// 序列化后的二进制数据(格式由 Codec 决定)
136    pub data: Vec<u8>,
137    /// 图结构指纹 — 恢复时校验兼容性
138    pub graph_hash: u64,
139    /// 创建时间
140    pub created_at: std::time::SystemTime,
141}
142
143impl CheckpointBlob {
144    pub fn new(
145        id: CheckpointId,
146        data: Vec<u8>,
147        graph_hash: u64,
148        created_at: std::time::SystemTime,
149    ) -> Self {
150        Self {
151            id,
152            data,
153            graph_hash,
154            created_at,
155        }
156    }
157}
158
159// ─── CheckpointStoreError ──────────────────────────────────────
160
161/// Checkpoint 存储操作错误。
162#[derive(Debug, thiserror::Error)]
163pub enum CheckpointStoreError {
164    #[error("storage error: {0}")]
165    Storage(String),
166    #[error("checkpoint not found: {0}")]
167    NotFound(CheckpointId),
168    #[error("corrupted checkpoint: {0}")]
169    Corrupted(String),
170    #[error("serialization error: {0}")]
171    Serialization(String),
172    #[error("graph mismatch: expected hash {expected:#018x}, got {actual:#018x}")]
173    GraphMismatch { expected: u64, actual: u64 },
174}
175
176// ─── TraceId Re-export ─────────────────────────────────────────
177
178/// 从 ids 模块重导出 TraceId。
179///
180/// 注意:Checkpoint 结构体**不包含** trace_id。
181/// 关联关系由存储层组织(如同一目录下的文件)。
182pub use crate::ids::TraceId;
183
184// ─── CheckpointPolicy 已迁移 ──────────────────────────────────
185
186/// 向后兼容 — CheckpointPolicy 已迁移至 checkpoint_policy 模块。
187/// v0.5 使用 TriggerPolicy + RetentionPolicy 替代。
188#[allow(deprecated)]
189#[doc(inline)]
190pub use crate::checkpoint_policy::CheckpointPolicy;
191
192// ─── Phase 6: Execution Frame Snapshot ────────────────────────
193
194/// 执行帧 — 保存单个 Graph 的执行位置。
195///
196/// 可序列化 — 用于 SessionCheckpoint 持久化。
197#[derive(Debug, Clone, Serialize, Deserialize)]
198pub struct Frame<S: WorkflowState = State> {
199    /// 图 ID
200    pub graph_id: String,
201
202    /// 当前节点 ID
203    pub node_id: String,
204
205    /// 状态快照(P0-1: 使用 Checkpoint 关联类型,可序列化)
206    pub state: S::Checkpoint,
207
208    /// 执行游标(节点索引或步骤数)
209    pub cursor: usize,
210}
211
212impl<S: WorkflowState> Frame<S> {
213    /// 从 Runtime State 创建 Frame(使用 snapshot() 投影)。
214    pub fn new(graph_id: String, node_id: String, state: &S, cursor: usize) -> Self {
215        Self {
216            graph_id,
217            node_id,
218            state: state.snapshot(),
219            cursor,
220        }
221    }
222}
223
224/// 帧栈 — 保存完整的执行位置历史。
225///
226/// 可序列化 — 用于 SessionCheckpoint 持久化。
227#[derive(Debug, Clone, Serialize, Deserialize)]
228pub struct FrameStack<S: WorkflowState = State>
229where
230    S::Checkpoint: Debug,
231{
232    /// 帧列表(从外到内)
233    frames: Vec<Frame<S>>,
234}
235
236impl<S: WorkflowState> FrameStack<S>
237where
238    S::Checkpoint: Debug,
239{
240    /// 创建空的帧栈。
241    pub fn new() -> Self {
242        Self { frames: Vec::new() }
243    }
244
245    /// Push 一个新帧。
246    pub fn push(&mut self, frame: Frame<S>) {
247        self.frames.push(frame);
248    }
249
250    /// Pop 最后一个帧。
251    pub fn pop(&mut self) -> Option<Frame<S>> {
252        self.frames.pop()
253    }
254
255    /// 获取当前帧(最顶层)。
256    pub fn current(&self) -> Option<&Frame<S>> {
257        self.frames.last()
258    }
259
260    /// 获取帧数量。
261    pub fn depth(&self) -> usize {
262        self.frames.len()
263    }
264
265    /// 检查是否为空。
266    pub fn is_empty(&self) -> bool {
267        self.frames.is_empty()
268    }
269
270    /// 获取所有帧的引用。
271    pub fn frames(&self) -> &[Frame<S>] {
272        &self.frames
273    }
274}
275
276impl<S: WorkflowState> Default for FrameStack<S>
277where
278    S::Checkpoint: Debug,
279{
280    fn default() -> Self {
281        Self::new()
282    }
283}