Skip to main content

a3s_code_core/
loop_checkpoint.rs

1//! Per-tool-round loop checkpoints for crash-tolerant runs (P3 cut 1).
2//!
3//! The agent loop persists a [`LoopCheckpoint`] after each completed tool
4//! round. The checkpoint captures the minimum state needed to recreate
5//! the loop's position so a future process — typically on a different
6//! node, dispatched by 书安OS after a crash or planned migration — can
7//! resume from the last consistent boundary.
8//!
9//! Boundary policy: checkpoints are taken **only** between tool rounds,
10//! never mid-tool. If a process dies while a tool is executing, the
11//! work of that round is lost on resume; the LLM re-deliberates from
12//! the previous checkpoint. This trades retry cost for correctness —
13//! re-executing a non-idempotent tool (write, bash) on the wrong side
14//! of the boundary is worse than re-asking the LLM.
15//!
16//! Resume API (cut 2 follow-up): not part of this cut. This module
17//! lands the data contract + persistence wiring; an
18//! `AgentSession::resume_run(run_id)` entry point will live on top.
19
20use crate::llm::{Message, TokenUsage};
21use crate::verification::VerificationReport;
22use async_trait::async_trait;
23use serde::{Deserialize, Serialize};
24
25/// Schema version. Bumped on incompatible format changes; impls of
26/// [`LoopCheckpointSink`] should reject loads from a future version.
27pub const LOOP_CHECKPOINT_SCHEMA_VERSION: u32 = 1;
28
29/// Snapshot of the agent loop at the boundary between tool rounds.
30///
31/// Stored under `run_id` so resume tooling can address the correct run
32/// without scanning all checkpoints of a session.
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct LoopCheckpoint {
35    /// Schema version — see [`LOOP_CHECKPOINT_SCHEMA_VERSION`].
36    #[serde(default)]
37    pub schema_version: u32,
38
39    /// Logical run identifier. Matches the `run_id` carried by
40    /// [`crate::run::RunSnapshot`] and `AgentEvent`s.
41    pub run_id: String,
42
43    /// Parent session id — redundant with `run_id` lookup but useful
44    /// for store layouts that key by `(session_id, run_id)`.
45    pub session_id: String,
46
47    /// 1-based tool round counter at checkpoint time.
48    /// `0` is reserved for "no rounds completed yet".
49    pub turn: usize,
50
51    /// Conversation history including the just-returned tool results.
52    /// On resume, the new agent loop starts from this exact message list.
53    pub messages: Vec<Message>,
54
55    /// Running token usage at checkpoint time. Lets resume re-emit
56    /// progress metrics without re-querying the LLM provider.
57    pub total_usage: TokenUsage,
58
59    /// How many tool calls have been executed total in this run.
60    pub tool_calls_count: usize,
61
62    /// Verification reports collected so far in this run.
63    #[serde(default)]
64    pub verification_reports: Vec<VerificationReport>,
65
66    /// Wall-clock timestamp when the checkpoint was written
67    /// (Unix epoch ms — sourced from the session's
68    /// [`HostEnv`](crate::host_env::HostEnv)).
69    pub checkpoint_ms: u64,
70}
71
72/// Receiver of per-tool-round checkpoints.
73///
74/// The framework ships one adapter:
75/// [`SessionStoreCheckpointSink`] which forwards to a
76/// [`crate::store::SessionStore`]. Hosts can implement custom sinks
77/// (e.g. push directly to Redis) by implementing this trait.
78#[async_trait]
79pub trait LoopCheckpointSink: Send + Sync {
80    /// Persist a checkpoint. Called from inside the agent loop after a
81    /// successful tool round. Errors are logged at warn level and
82    /// otherwise swallowed — losing a checkpoint must not halt the
83    /// live run.
84    async fn save_checkpoint(&self, checkpoint: &LoopCheckpoint);
85
86    /// Load the latest checkpoint for `run_id`, if any. Returns `None`
87    /// when no checkpoint has been recorded.
88    async fn load_latest(&self, run_id: &str) -> Option<LoopCheckpoint>;
89}
90
91/// Default adapter that forwards checkpoints to a
92/// [`SessionStore`](crate::store::SessionStore). Construct via
93/// [`SessionStoreCheckpointSink::new`].
94pub struct SessionStoreCheckpointSink {
95    inner: std::sync::Arc<dyn crate::store::SessionStore>,
96}
97
98impl SessionStoreCheckpointSink {
99    pub fn new(store: std::sync::Arc<dyn crate::store::SessionStore>) -> Self {
100        Self { inner: store }
101    }
102}
103
104#[async_trait]
105impl LoopCheckpointSink for SessionStoreCheckpointSink {
106    async fn save_checkpoint(&self, checkpoint: &LoopCheckpoint) {
107        if let Err(e) = self
108            .inner
109            .save_loop_checkpoint(&checkpoint.run_id, checkpoint)
110            .await
111        {
112            tracing::warn!(
113                run_id = %checkpoint.run_id,
114                error = %e,
115                "Loop checkpoint save failed; live run continues"
116            );
117        }
118    }
119
120    async fn load_latest(&self, run_id: &str) -> Option<LoopCheckpoint> {
121        match self.inner.load_loop_checkpoint(run_id).await {
122            Ok(opt) => opt,
123            Err(e) => {
124                tracing::warn!(
125                    run_id = %run_id,
126                    error = %e,
127                    "Loop checkpoint load failed"
128                );
129                None
130            }
131        }
132    }
133}
134
135#[cfg(test)]
136mod tests {
137    use super::*;
138
139    fn sample(run_id: &str, turn: usize) -> LoopCheckpoint {
140        LoopCheckpoint {
141            schema_version: LOOP_CHECKPOINT_SCHEMA_VERSION,
142            run_id: run_id.to_string(),
143            session_id: "session-1".to_string(),
144            turn,
145            messages: vec![Message::user("hi")],
146            total_usage: TokenUsage::default(),
147            tool_calls_count: 0,
148            verification_reports: Vec::new(),
149            checkpoint_ms: 1_700_000_000_000,
150        }
151    }
152
153    #[test]
154    fn checkpoint_round_trips_through_json() {
155        let cp = sample("run-1", 3);
156        let json = serde_json::to_string(&cp).unwrap();
157        let back: LoopCheckpoint = serde_json::from_str(&json).unwrap();
158        assert_eq!(back.run_id, "run-1");
159        assert_eq!(back.turn, 3);
160        assert_eq!(back.schema_version, LOOP_CHECKPOINT_SCHEMA_VERSION);
161    }
162
163    #[test]
164    fn missing_schema_version_defaults_to_zero() {
165        // Older payloads without the field must still load — they'll
166        // be interpreted as a pre-v1 snapshot.
167        let json = r#"{
168            "run_id": "run-1",
169            "session_id": "s",
170            "turn": 1,
171            "messages": [],
172            "total_usage": {"prompt_tokens":0,"completion_tokens":0,"total_tokens":0},
173            "tool_calls_count": 0,
174            "checkpoint_ms": 0
175        }"#;
176        let cp: LoopCheckpoint = serde_json::from_str(json).unwrap();
177        assert_eq!(cp.schema_version, 0);
178    }
179}