Skip to main content

a3s_code_core/
loop_checkpoint.rs

1//! Per-tool-round loop checkpoints for crash-tolerant runs (P3 cut 1).
2//!
3//! The agent loop persists a [`LoopCheckpoint`] after each completed tool
4//! round. The checkpoint captures the minimum state needed to recreate
5//! the loop's position so a future process — typically on a different
6//! node, dispatched by 书安OS after a crash or planned migration — can
7//! resume from the last consistent boundary.
8//!
9//! Boundary policy: checkpoints are taken **only** between tool rounds,
10//! never mid-tool. If a process dies while a tool is executing, the
11//! work of that round is lost on resume; the LLM re-deliberates from
12//! the previous checkpoint. This trades retry cost for correctness —
13//! re-executing a non-idempotent tool (write, bash) on the wrong side
14//! of the boundary is worse than re-asking the LLM.
15//!
16//! Resume API (cut 2 follow-up): not part of this cut. This module
17//! lands the data contract + persistence wiring; an
18//! `AgentSession::resume_run(run_id)` entry point will live on top.
19
20use crate::llm::{Message, TokenUsage};
21use crate::verification::VerificationReport;
22use async_trait::async_trait;
23use serde::{Deserialize, Serialize};
24
25/// Schema version. Bumped on incompatible format changes; impls of
26/// [`LoopCheckpointSink`] should reject loads from a future version.
27pub const LOOP_CHECKPOINT_SCHEMA_VERSION: u32 = 1;
28
29/// Snapshot of the agent loop at the boundary between tool rounds.
30///
31/// Stored under `run_id` so resume tooling can address the correct run
32/// without scanning all checkpoints of a session.
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct LoopCheckpoint {
35    /// Schema version — see [`LOOP_CHECKPOINT_SCHEMA_VERSION`].
36    #[serde(default)]
37    pub schema_version: u32,
38
39    /// Logical run identifier. Matches the `run_id` carried by
40    /// [`crate::run::RunSnapshot`] and `AgentEvent`s.
41    pub run_id: String,
42
43    /// Parent session id — redundant with `run_id` lookup but useful
44    /// for store layouts that key by `(session_id, run_id)`.
45    pub session_id: String,
46
47    /// 1-based tool round counter at checkpoint time.
48    /// `0` is reserved for "no rounds completed yet".
49    pub turn: usize,
50
51    /// Conversation history including the just-returned tool results.
52    /// On resume, the new agent loop starts from this exact message list.
53    pub messages: Vec<Message>,
54
55    /// Running token usage at checkpoint time. Lets resume re-emit
56    /// progress metrics without re-querying the LLM provider.
57    pub total_usage: TokenUsage,
58
59    /// How many tool calls have been executed total in this run.
60    pub tool_calls_count: usize,
61
62    /// Verification reports collected so far in this run.
63    #[serde(default)]
64    pub verification_reports: Vec<VerificationReport>,
65
66    /// Wall-clock timestamp when the checkpoint was written
67    /// (Unix epoch ms — sourced from the session's
68    /// [`HostEnv`](crate::host_env::HostEnv)).
69    pub checkpoint_ms: u64,
70}
71
72impl LoopCheckpoint {
73    /// Reject a checkpoint written by a *newer*, incompatible schema
74    /// version than this build understands — honoring the contract on
75    /// [`LOOP_CHECKPOINT_SCHEMA_VERSION`].
76    ///
77    /// Field *additions* are absorbed transparently by `#[serde(default)]`,
78    /// so an older checkpoint (lower `schema_version`, including a pre-v1
79    /// `0`) always remains loadable. A *future* version, however, may have
80    /// changed the meaning of existing fields or the tool-round boundary
81    /// semantics; resuming from one risks silent corruption (e.g.
82    /// re-running a non-idempotent tool on the wrong side of the boundary).
83    ///
84    /// [`SessionStore`](crate::store::SessionStore) impls call this right
85    /// after deserialization, so both `resume_run` (which surfaces the
86    /// error to the caller) and the live-run [`LoopCheckpointSink`] (which
87    /// logs and starts fresh) refuse to act on an unreadable checkpoint.
88    pub fn ensure_loadable(&self) -> anyhow::Result<()> {
89        if self.schema_version > LOOP_CHECKPOINT_SCHEMA_VERSION {
90            anyhow::bail!(
91                "loop checkpoint for run {} has schema version {} but this build supports at \
92                 most {}; refusing to resume from an incompatible future checkpoint",
93                self.run_id,
94                self.schema_version,
95                LOOP_CHECKPOINT_SCHEMA_VERSION
96            );
97        }
98        Ok(())
99    }
100}
101
102/// Receiver of per-tool-round checkpoints.
103///
104/// The framework ships one adapter:
105/// [`SessionStoreCheckpointSink`] which forwards to a
106/// [`crate::store::SessionStore`]. Hosts can implement custom sinks
107/// (e.g. push directly to Redis) by implementing this trait.
108#[async_trait]
109pub trait LoopCheckpointSink: Send + Sync {
110    /// Persist a checkpoint. Called from inside the agent loop after a
111    /// successful tool round. Errors are logged at warn level and
112    /// otherwise swallowed — losing a checkpoint must not halt the
113    /// live run.
114    async fn save_checkpoint(&self, checkpoint: &LoopCheckpoint);
115
116    /// Load the latest checkpoint for `run_id`, if any. Returns `None`
117    /// when no checkpoint has been recorded.
118    async fn load_latest(&self, run_id: &str) -> Option<LoopCheckpoint>;
119}
120
121/// Default adapter that forwards checkpoints to a
122/// [`SessionStore`](crate::store::SessionStore). Construct via
123/// [`SessionStoreCheckpointSink::new`].
124pub struct SessionStoreCheckpointSink {
125    inner: std::sync::Arc<dyn crate::store::SessionStore>,
126}
127
128impl SessionStoreCheckpointSink {
129    pub fn new(store: std::sync::Arc<dyn crate::store::SessionStore>) -> Self {
130        Self { inner: store }
131    }
132}
133
134#[async_trait]
135impl LoopCheckpointSink for SessionStoreCheckpointSink {
136    async fn save_checkpoint(&self, checkpoint: &LoopCheckpoint) {
137        if let Err(e) = self
138            .inner
139            .save_loop_checkpoint(&checkpoint.run_id, checkpoint)
140            .await
141        {
142            tracing::warn!(
143                run_id = %checkpoint.run_id,
144                error = %e,
145                "Loop checkpoint save failed; live run continues"
146            );
147        }
148    }
149
150    async fn load_latest(&self, run_id: &str) -> Option<LoopCheckpoint> {
151        match self.inner.load_loop_checkpoint(run_id).await {
152            Ok(opt) => opt,
153            Err(e) => {
154                tracing::warn!(
155                    run_id = %run_id,
156                    error = %e,
157                    "Loop checkpoint load failed"
158                );
159                None
160            }
161        }
162    }
163}
164
165#[cfg(test)]
166mod tests {
167    use super::*;
168
169    fn sample(run_id: &str, turn: usize) -> LoopCheckpoint {
170        LoopCheckpoint {
171            schema_version: LOOP_CHECKPOINT_SCHEMA_VERSION,
172            run_id: run_id.to_string(),
173            session_id: "session-1".to_string(),
174            turn,
175            messages: vec![Message::user("hi")],
176            total_usage: TokenUsage::default(),
177            tool_calls_count: 0,
178            verification_reports: Vec::new(),
179            checkpoint_ms: 1_700_000_000_000,
180        }
181    }
182
183    #[test]
184    fn checkpoint_round_trips_through_json() {
185        let cp = sample("run-1", 3);
186        let json = serde_json::to_string(&cp).unwrap();
187        let back: LoopCheckpoint = serde_json::from_str(&json).unwrap();
188        assert_eq!(back.run_id, "run-1");
189        assert_eq!(back.turn, 3);
190        assert_eq!(back.schema_version, LOOP_CHECKPOINT_SCHEMA_VERSION);
191    }
192
193    #[test]
194    fn missing_schema_version_defaults_to_zero() {
195        // Older payloads without the field must still load — they'll
196        // be interpreted as a pre-v1 snapshot.
197        let json = r#"{
198            "run_id": "run-1",
199            "session_id": "s",
200            "turn": 1,
201            "messages": [],
202            "total_usage": {"prompt_tokens":0,"completion_tokens":0,"total_tokens":0},
203            "tool_calls_count": 0,
204            "checkpoint_ms": 0
205        }"#;
206        let cp: LoopCheckpoint = serde_json::from_str(json).unwrap();
207        assert_eq!(cp.schema_version, 0);
208    }
209}