a3s_code_core/loop_checkpoint.rs
1//! Per-tool-round loop checkpoints for crash-tolerant runs (P3 cut 1).
2//!
3//! The agent loop persists a [`LoopCheckpoint`] after each completed tool
4//! round. The checkpoint captures the minimum state needed to recreate
5//! the loop's position so a future process — typically on a different
6//! node, dispatched by 书安OS after a crash or planned migration — can
7//! resume from the last consistent boundary.
8//!
9//! Boundary policy: checkpoints are taken **only** between tool rounds,
10//! never mid-tool. If a process dies while a tool is executing, the
11//! work of that round is lost on resume; the LLM re-deliberates from
12//! the previous checkpoint. This trades retry cost for correctness —
13//! re-executing a non-idempotent tool (write, bash) on the wrong side
14//! of the boundary is worse than re-asking the LLM.
15//!
16//! Resume API (cut 2 follow-up): not part of this cut. This module
17//! lands the data contract + persistence wiring; an
18//! `AgentSession::resume_run(run_id)` entry point will live on top.
19
20use crate::llm::{Message, TokenUsage};
21use crate::verification::VerificationReport;
22use async_trait::async_trait;
23use serde::{Deserialize, Serialize};
24
25/// Schema version. Bumped on incompatible format changes; impls of
26/// [`LoopCheckpointSink`] should reject loads from a future version.
27pub const LOOP_CHECKPOINT_SCHEMA_VERSION: u32 = 1;
28
29/// Snapshot of the agent loop at the boundary between tool rounds.
30///
31/// Stored under `run_id` so resume tooling can address the correct run
32/// without scanning all checkpoints of a session.
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct LoopCheckpoint {
35 /// Schema version — see [`LOOP_CHECKPOINT_SCHEMA_VERSION`].
36 #[serde(default)]
37 pub schema_version: u32,
38
39 /// Logical run identifier. Matches the `run_id` carried by
40 /// [`crate::run::RunSnapshot`] and `AgentEvent`s.
41 pub run_id: String,
42
43 /// Parent session id — redundant with `run_id` lookup but useful
44 /// for store layouts that key by `(session_id, run_id)`.
45 pub session_id: String,
46
47 /// 1-based tool round counter at checkpoint time.
48 /// `0` is reserved for "no rounds completed yet".
49 pub turn: usize,
50
51 /// Conversation history including the just-returned tool results.
52 /// On resume, the new agent loop starts from this exact message list.
53 pub messages: Vec<Message>,
54
55 /// Running token usage at checkpoint time. Lets resume re-emit
56 /// progress metrics without re-querying the LLM provider.
57 pub total_usage: TokenUsage,
58
59 /// How many tool calls have been executed total in this run.
60 pub tool_calls_count: usize,
61
62 /// Verification reports collected so far in this run.
63 #[serde(default)]
64 pub verification_reports: Vec<VerificationReport>,
65
66 /// Wall-clock timestamp when the checkpoint was written
67 /// (Unix epoch ms — sourced from the session's
68 /// [`HostEnv`](crate::host_env::HostEnv)).
69 pub checkpoint_ms: u64,
70}
71
72/// Receiver of per-tool-round checkpoints.
73///
74/// The framework ships one adapter:
75/// [`SessionStoreCheckpointSink`] which forwards to a
76/// [`crate::store::SessionStore`]. Hosts can implement custom sinks
77/// (e.g. push directly to Redis) by implementing this trait.
78#[async_trait]
79pub trait LoopCheckpointSink: Send + Sync {
80 /// Persist a checkpoint. Called from inside the agent loop after a
81 /// successful tool round. Errors are logged at warn level and
82 /// otherwise swallowed — losing a checkpoint must not halt the
83 /// live run.
84 async fn save_checkpoint(&self, checkpoint: &LoopCheckpoint);
85
86 /// Load the latest checkpoint for `run_id`, if any. Returns `None`
87 /// when no checkpoint has been recorded.
88 async fn load_latest(&self, run_id: &str) -> Option<LoopCheckpoint>;
89}
90
91/// Default adapter that forwards checkpoints to a
92/// [`SessionStore`](crate::store::SessionStore). Construct via
93/// [`SessionStoreCheckpointSink::new`].
94pub struct SessionStoreCheckpointSink {
95 inner: std::sync::Arc<dyn crate::store::SessionStore>,
96}
97
98impl SessionStoreCheckpointSink {
99 pub fn new(store: std::sync::Arc<dyn crate::store::SessionStore>) -> Self {
100 Self { inner: store }
101 }
102}
103
104#[async_trait]
105impl LoopCheckpointSink for SessionStoreCheckpointSink {
106 async fn save_checkpoint(&self, checkpoint: &LoopCheckpoint) {
107 if let Err(e) = self
108 .inner
109 .save_loop_checkpoint(&checkpoint.run_id, checkpoint)
110 .await
111 {
112 tracing::warn!(
113 run_id = %checkpoint.run_id,
114 error = %e,
115 "Loop checkpoint save failed; live run continues"
116 );
117 }
118 }
119
120 async fn load_latest(&self, run_id: &str) -> Option<LoopCheckpoint> {
121 match self.inner.load_loop_checkpoint(run_id).await {
122 Ok(opt) => opt,
123 Err(e) => {
124 tracing::warn!(
125 run_id = %run_id,
126 error = %e,
127 "Loop checkpoint load failed"
128 );
129 None
130 }
131 }
132 }
133}
134
135#[cfg(test)]
136mod tests {
137 use super::*;
138
139 fn sample(run_id: &str, turn: usize) -> LoopCheckpoint {
140 LoopCheckpoint {
141 schema_version: LOOP_CHECKPOINT_SCHEMA_VERSION,
142 run_id: run_id.to_string(),
143 session_id: "session-1".to_string(),
144 turn,
145 messages: vec![Message::user("hi")],
146 total_usage: TokenUsage::default(),
147 tool_calls_count: 0,
148 verification_reports: Vec::new(),
149 checkpoint_ms: 1_700_000_000_000,
150 }
151 }
152
153 #[test]
154 fn checkpoint_round_trips_through_json() {
155 let cp = sample("run-1", 3);
156 let json = serde_json::to_string(&cp).unwrap();
157 let back: LoopCheckpoint = serde_json::from_str(&json).unwrap();
158 assert_eq!(back.run_id, "run-1");
159 assert_eq!(back.turn, 3);
160 assert_eq!(back.schema_version, LOOP_CHECKPOINT_SCHEMA_VERSION);
161 }
162
163 #[test]
164 fn missing_schema_version_defaults_to_zero() {
165 // Older payloads without the field must still load — they'll
166 // be interpreted as a pre-v1 snapshot.
167 let json = r#"{
168 "run_id": "run-1",
169 "session_id": "s",
170 "turn": 1,
171 "messages": [],
172 "total_usage": {"prompt_tokens":0,"completion_tokens":0,"total_tokens":0},
173 "tool_calls_count": 0,
174 "checkpoint_ms": 0
175 }"#;
176 let cp: LoopCheckpoint = serde_json::from_str(json).unwrap();
177 assert_eq!(cp.schema_version, 0);
178 }
179}