stakpak_server/
checkpoint_store.rs1use std::path::{Path, PathBuf};
2
3use stakpak_agent_core::{CheckpointEnvelopeV1, deserialize_checkpoint, serialize_checkpoint};
4use uuid::Uuid;
5
6#[derive(Debug, Clone)]
7pub struct CheckpointStore {
8 root: PathBuf,
9}
10
11impl CheckpointStore {
12 pub fn new(root: impl Into<PathBuf>) -> Self {
13 let root = root.into();
14 let _ = std::fs::create_dir_all(&root);
15 Self { root }
16 }
17
18 pub fn default_local() -> Self {
19 let root = std::env::var("HOME")
20 .map(|home| {
21 PathBuf::from(home)
22 .join(".stakpak")
23 .join("server")
24 .join("checkpoints")
25 })
26 .unwrap_or_else(|_| PathBuf::from(".stakpak").join("server").join("checkpoints"));
27
28 Self::new(root)
29 }
30
31 pub fn root(&self) -> &Path {
32 &self.root
33 }
34
35 pub async fn load_latest(
36 &self,
37 session_id: Uuid,
38 ) -> Result<Option<CheckpointEnvelopeV1>, String> {
39 let latest_path = self.latest_path(session_id);
40
41 let payload = match tokio::fs::read(&latest_path).await {
42 Ok(payload) => payload,
43 Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(None),
44 Err(error) => {
45 return Err(format!(
46 "Failed to read checkpoint envelope from {}: {}",
47 latest_path.display(),
48 error
49 ));
50 }
51 };
52
53 deserialize_checkpoint(&payload)
54 .map(Some)
55 .map_err(|error| format!("Failed to deserialize checkpoint envelope: {error}"))
56 }
57
58 pub async fn save_latest(
59 &self,
60 session_id: Uuid,
61 envelope: &CheckpointEnvelopeV1,
62 ) -> Result<(), String> {
63 let latest_path = self.latest_path(session_id);
64 if let Some(parent) = latest_path.parent() {
65 tokio::fs::create_dir_all(parent).await.map_err(|error| {
66 format!(
67 "Failed to create checkpoint directory {}: {}",
68 parent.display(),
69 error
70 )
71 })?;
72 }
73
74 let payload = serialize_checkpoint(envelope)
75 .map_err(|error| format!("Failed to serialize checkpoint envelope: {error}"))?;
76
77 let temp_path = latest_path.with_extension("tmp");
78
79 tokio::fs::write(&temp_path, payload)
80 .await
81 .map_err(|error| {
82 format!(
83 "Failed to write temporary checkpoint envelope {}: {}",
84 temp_path.display(),
85 error
86 )
87 })?;
88
89 tokio::fs::rename(&temp_path, &latest_path)
90 .await
91 .map_err(|error| {
92 format!(
93 "Failed to finalize checkpoint envelope {}: {}",
94 latest_path.display(),
95 error
96 )
97 })?;
98
99 Ok(())
100 }
101
102 fn latest_path(&self, session_id: Uuid) -> PathBuf {
103 self.root
104 .join(session_id.to_string())
105 .join("latest.checkpoint")
106 }
107}
108
109#[cfg(test)]
110mod tests {
111 use super::*;
112 use serde_json::json;
113 use stakai::{Message, Role};
114
115 #[tokio::test]
116 async fn save_and_load_checkpoint_envelope_roundtrip() {
117 let root = std::env::temp_dir().join(format!("stakpak-checkpoint-test-{}", Uuid::new_v4()));
118 let store = CheckpointStore::new(root);
119 let session_id = Uuid::new_v4();
120 let run_id = Uuid::new_v4();
121
122 let envelope = CheckpointEnvelopeV1::new(
123 Some(run_id),
124 vec![Message::new(Role::User, "hello")],
125 json!({"turn": 1}),
126 );
127
128 let save = store.save_latest(session_id, &envelope).await;
129 assert!(save.is_ok());
130
131 let loaded = store.load_latest(session_id).await;
132 assert!(loaded.is_ok());
133
134 let Some(loaded_envelope) = loaded.ok().flatten() else {
135 panic!("expected checkpoint envelope");
136 };
137
138 assert_eq!(loaded_envelope.run_id, Some(run_id));
139 assert_eq!(
140 loaded_envelope
141 .messages
142 .first()
143 .and_then(stakai::Message::text),
144 Some("hello".to_string())
145 );
146 }
147
148 #[tokio::test]
149 async fn load_latest_migrates_legacy_messages_array_payload() {
150 let root = std::env::temp_dir().join(format!("stakpak-checkpoint-test-{}", Uuid::new_v4()));
151 let store = CheckpointStore::new(root);
152 let session_id = Uuid::new_v4();
153
154 let legacy_payload = json!([
155 {
156 "role": "user",
157 "content": "legacy"
158 }
159 ]);
160
161 let latest_path = store
162 .root()
163 .join(session_id.to_string())
164 .join("latest.checkpoint");
165
166 if let Some(parent) = latest_path.parent() {
167 let create_dir = tokio::fs::create_dir_all(parent).await;
168 assert!(create_dir.is_ok());
169 }
170
171 let write_result = tokio::fs::write(&latest_path, legacy_payload.to_string()).await;
172 assert!(write_result.is_ok());
173
174 let loaded = store.load_latest(session_id).await;
175 assert!(loaded.is_ok());
176
177 let Some(loaded_envelope) = loaded.ok().flatten() else {
178 panic!("expected migrated checkpoint envelope");
179 };
180
181 assert_eq!(
182 loaded_envelope
183 .messages
184 .first()
185 .and_then(stakai::Message::text),
186 Some("legacy".to_string())
187 );
188 }
189
190 #[tokio::test]
191 async fn load_latest_returns_none_for_missing_session() {
192 let root = std::env::temp_dir().join(format!("stakpak-checkpoint-test-{}", Uuid::new_v4()));
193 let store = CheckpointStore::new(root);
194
195 let loaded = store.load_latest(Uuid::new_v4()).await;
196 assert!(loaded.is_ok());
197 assert!(loaded.ok().flatten().is_none());
198 }
199}