a3s_code_core/loop_checkpoint.rs
1//! Per-tool-round loop checkpoints for crash-tolerant runs (P3 cut 1).
2//!
3//! The agent loop persists a [`LoopCheckpoint`] after each completed tool
4//! round. The checkpoint captures the minimum state needed to recreate
5//! the loop's position so a future process — typically on a different
6//! node, dispatched by 书安OS after a crash or planned migration — can
7//! resume from the last consistent boundary.
8//!
9//! Boundary policy: checkpoints are taken **only** between tool rounds,
10//! never mid-tool. If a process dies while a tool is executing, the
11//! work of that round is lost on resume; the LLM re-deliberates from
12//! the previous checkpoint. This trades retry cost for correctness —
13//! re-executing a non-idempotent tool (write, bash) on the wrong side
14//! of the boundary is worse than re-asking the LLM.
15//!
16//! Resume API (cut 2 follow-up): not part of this cut. This module
17//! lands the data contract + persistence wiring; an
18//! `AgentSession::resume_run(run_id)` entry point will live on top.
19
20use crate::llm::{Message, TokenUsage};
21use crate::verification::VerificationReport;
22use async_trait::async_trait;
23use serde::{Deserialize, Serialize};
24
25/// Schema version. Bumped on incompatible format changes; impls of
26/// [`LoopCheckpointSink`] should reject loads from a future version.
27pub const LOOP_CHECKPOINT_SCHEMA_VERSION: u32 = 1;
28
29/// Snapshot of the agent loop at the boundary between tool rounds.
30///
31/// Stored under `run_id` so resume tooling can address the correct run
32/// without scanning all checkpoints of a session.
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct LoopCheckpoint {
35 /// Schema version — see [`LOOP_CHECKPOINT_SCHEMA_VERSION`].
36 #[serde(default)]
37 pub schema_version: u32,
38
39 /// Logical run identifier. Matches the `run_id` carried by
40 /// [`crate::run::RunSnapshot`] and `AgentEvent`s.
41 pub run_id: String,
42
43 /// Parent session id — redundant with `run_id` lookup but useful
44 /// for store layouts that key by `(session_id, run_id)`.
45 pub session_id: String,
46
47 /// 1-based tool round counter at checkpoint time.
48 /// `0` is reserved for "no rounds completed yet".
49 pub turn: usize,
50
51 /// Conversation history including the just-returned tool results.
52 /// On resume, the new agent loop starts from this exact message list.
53 pub messages: Vec<Message>,
54
55 /// Running token usage at checkpoint time. Lets resume re-emit
56 /// progress metrics without re-querying the LLM provider.
57 pub total_usage: TokenUsage,
58
59 /// How many tool calls have been executed total in this run.
60 pub tool_calls_count: usize,
61
62 /// Verification reports collected so far in this run.
63 #[serde(default)]
64 pub verification_reports: Vec<VerificationReport>,
65
66 /// Wall-clock timestamp when the checkpoint was written
67 /// (Unix epoch ms — sourced from the session's
68 /// [`HostEnv`](crate::host_env::HostEnv)).
69 pub checkpoint_ms: u64,
70}
71
72impl LoopCheckpoint {
73 /// Reject a checkpoint written by a *newer*, incompatible schema
74 /// version than this build understands — honoring the contract on
75 /// [`LOOP_CHECKPOINT_SCHEMA_VERSION`].
76 ///
77 /// Field *additions* are absorbed transparently by `#[serde(default)]`,
78 /// so an older checkpoint (lower `schema_version`, including a pre-v1
79 /// `0`) always remains loadable. A *future* version, however, may have
80 /// changed the meaning of existing fields or the tool-round boundary
81 /// semantics; resuming from one risks silent corruption (e.g.
82 /// re-running a non-idempotent tool on the wrong side of the boundary).
83 ///
84 /// [`SessionStore`](crate::store::SessionStore) impls call this right
85 /// after deserialization, so both `resume_run` (which surfaces the
86 /// error to the caller) and the live-run [`LoopCheckpointSink`] (which
87 /// logs and starts fresh) refuse to act on an unreadable checkpoint.
88 pub fn ensure_loadable(&self) -> anyhow::Result<()> {
89 if self.schema_version > LOOP_CHECKPOINT_SCHEMA_VERSION {
90 anyhow::bail!(
91 "loop checkpoint for run {} has schema version {} but this build supports at \
92 most {}; refusing to resume from an incompatible future checkpoint",
93 self.run_id,
94 self.schema_version,
95 LOOP_CHECKPOINT_SCHEMA_VERSION
96 );
97 }
98 Ok(())
99 }
100}
101
102/// Receiver of per-tool-round checkpoints.
103///
104/// The framework ships one adapter:
105/// [`SessionStoreCheckpointSink`] which forwards to a
106/// [`crate::store::SessionStore`]. Hosts can implement custom sinks
107/// (e.g. push directly to Redis) by implementing this trait.
108#[async_trait]
109pub trait LoopCheckpointSink: Send + Sync {
110 /// Persist a checkpoint. Called from inside the agent loop after a
111 /// successful tool round. Errors are logged at warn level and
112 /// otherwise swallowed — losing a checkpoint must not halt the
113 /// live run.
114 async fn save_checkpoint(&self, checkpoint: &LoopCheckpoint);
115
116 /// Load the latest checkpoint for `run_id`, if any. Returns `None`
117 /// when no checkpoint has been recorded.
118 async fn load_latest(&self, run_id: &str) -> Option<LoopCheckpoint>;
119}
120
121/// Default adapter that forwards checkpoints to a
122/// [`SessionStore`](crate::store::SessionStore). Construct via
123/// [`SessionStoreCheckpointSink::new`].
124pub struct SessionStoreCheckpointSink {
125 inner: std::sync::Arc<dyn crate::store::SessionStore>,
126}
127
128impl SessionStoreCheckpointSink {
129 pub fn new(store: std::sync::Arc<dyn crate::store::SessionStore>) -> Self {
130 Self { inner: store }
131 }
132}
133
134#[async_trait]
135impl LoopCheckpointSink for SessionStoreCheckpointSink {
136 async fn save_checkpoint(&self, checkpoint: &LoopCheckpoint) {
137 if let Err(e) = self
138 .inner
139 .save_loop_checkpoint(&checkpoint.run_id, checkpoint)
140 .await
141 {
142 tracing::warn!(
143 run_id = %checkpoint.run_id,
144 error = %e,
145 "Loop checkpoint save failed; live run continues"
146 );
147 }
148 }
149
150 async fn load_latest(&self, run_id: &str) -> Option<LoopCheckpoint> {
151 match self.inner.load_loop_checkpoint(run_id).await {
152 Ok(opt) => opt,
153 Err(e) => {
154 tracing::warn!(
155 run_id = %run_id,
156 error = %e,
157 "Loop checkpoint load failed"
158 );
159 None
160 }
161 }
162 }
163}
164
165#[cfg(test)]
166mod tests {
167 use super::*;
168
169 fn sample(run_id: &str, turn: usize) -> LoopCheckpoint {
170 LoopCheckpoint {
171 schema_version: LOOP_CHECKPOINT_SCHEMA_VERSION,
172 run_id: run_id.to_string(),
173 session_id: "session-1".to_string(),
174 turn,
175 messages: vec![Message::user("hi")],
176 total_usage: TokenUsage::default(),
177 tool_calls_count: 0,
178 verification_reports: Vec::new(),
179 checkpoint_ms: 1_700_000_000_000,
180 }
181 }
182
183 #[test]
184 fn checkpoint_round_trips_through_json() {
185 let cp = sample("run-1", 3);
186 let json = serde_json::to_string(&cp).unwrap();
187 let back: LoopCheckpoint = serde_json::from_str(&json).unwrap();
188 assert_eq!(back.run_id, "run-1");
189 assert_eq!(back.turn, 3);
190 assert_eq!(back.schema_version, LOOP_CHECKPOINT_SCHEMA_VERSION);
191 }
192
193 #[test]
194 fn missing_schema_version_defaults_to_zero() {
195 // Older payloads without the field must still load — they'll
196 // be interpreted as a pre-v1 snapshot.
197 let json = r#"{
198 "run_id": "run-1",
199 "session_id": "s",
200 "turn": 1,
201 "messages": [],
202 "total_usage": {"prompt_tokens":0,"completion_tokens":0,"total_tokens":0},
203 "tool_calls_count": 0,
204 "checkpoint_ms": 0
205 }"#;
206 let cp: LoopCheckpoint = serde_json::from_str(json).unwrap();
207 assert_eq!(cp.schema_version, 0);
208 }
209}