Skip to main content

stakpak_server/
checkpoint_store.rs

1use std::path::{Path, PathBuf};
2
3use stakpak_agent_core::{CheckpointEnvelopeV1, deserialize_checkpoint, serialize_checkpoint};
4use uuid::Uuid;
5
6#[derive(Debug, Clone)]
7pub struct CheckpointStore {
8    root: PathBuf,
9}
10
11impl CheckpointStore {
12    pub fn new(root: impl Into<PathBuf>) -> Self {
13        let root = root.into();
14        let _ = std::fs::create_dir_all(&root);
15        Self { root }
16    }
17
18    pub fn default_local() -> Self {
19        let root = std::env::var("HOME")
20            .map(|home| {
21                PathBuf::from(home)
22                    .join(".stakpak")
23                    .join("server")
24                    .join("checkpoints")
25            })
26            .unwrap_or_else(|_| PathBuf::from(".stakpak").join("server").join("checkpoints"));
27
28        Self::new(root)
29    }
30
31    pub fn root(&self) -> &Path {
32        &self.root
33    }
34
35    pub async fn load_latest(
36        &self,
37        session_id: Uuid,
38    ) -> Result<Option<CheckpointEnvelopeV1>, String> {
39        let latest_path = self.latest_path(session_id);
40
41        let payload = match tokio::fs::read(&latest_path).await {
42            Ok(payload) => payload,
43            Err(error) if error.kind() == std::io::ErrorKind::NotFound => return Ok(None),
44            Err(error) => {
45                return Err(format!(
46                    "Failed to read checkpoint envelope from {}: {}",
47                    latest_path.display(),
48                    error
49                ));
50            }
51        };
52
53        deserialize_checkpoint(&payload)
54            .map(Some)
55            .map_err(|error| format!("Failed to deserialize checkpoint envelope: {error}"))
56    }
57
58    pub async fn save_latest(
59        &self,
60        session_id: Uuid,
61        envelope: &CheckpointEnvelopeV1,
62    ) -> Result<(), String> {
63        let latest_path = self.latest_path(session_id);
64        if let Some(parent) = latest_path.parent() {
65            tokio::fs::create_dir_all(parent).await.map_err(|error| {
66                format!(
67                    "Failed to create checkpoint directory {}: {}",
68                    parent.display(),
69                    error
70                )
71            })?;
72        }
73
74        let payload = serialize_checkpoint(envelope)
75            .map_err(|error| format!("Failed to serialize checkpoint envelope: {error}"))?;
76
77        let temp_path = latest_path.with_extension("tmp");
78
79        tokio::fs::write(&temp_path, payload)
80            .await
81            .map_err(|error| {
82                format!(
83                    "Failed to write temporary checkpoint envelope {}: {}",
84                    temp_path.display(),
85                    error
86                )
87            })?;
88
89        tokio::fs::rename(&temp_path, &latest_path)
90            .await
91            .map_err(|error| {
92                format!(
93                    "Failed to finalize checkpoint envelope {}: {}",
94                    latest_path.display(),
95                    error
96                )
97            })?;
98
99        Ok(())
100    }
101
102    fn latest_path(&self, session_id: Uuid) -> PathBuf {
103        self.root
104            .join(session_id.to_string())
105            .join("latest.checkpoint")
106    }
107}
108
109#[cfg(test)]
110mod tests {
111    use super::*;
112    use serde_json::json;
113    use stakai::{Message, Role};
114
115    #[tokio::test]
116    async fn save_and_load_checkpoint_envelope_roundtrip() {
117        let root = std::env::temp_dir().join(format!("stakpak-checkpoint-test-{}", Uuid::new_v4()));
118        let store = CheckpointStore::new(root);
119        let session_id = Uuid::new_v4();
120        let run_id = Uuid::new_v4();
121
122        let envelope = CheckpointEnvelopeV1::new(
123            Some(run_id),
124            vec![Message::new(Role::User, "hello")],
125            json!({"turn": 1}),
126        );
127
128        let save = store.save_latest(session_id, &envelope).await;
129        assert!(save.is_ok());
130
131        let loaded = store.load_latest(session_id).await;
132        assert!(loaded.is_ok());
133
134        let Some(loaded_envelope) = loaded.ok().flatten() else {
135            panic!("expected checkpoint envelope");
136        };
137
138        assert_eq!(loaded_envelope.run_id, Some(run_id));
139        assert_eq!(
140            loaded_envelope
141                .messages
142                .first()
143                .and_then(stakai::Message::text),
144            Some("hello".to_string())
145        );
146    }
147
148    #[tokio::test]
149    async fn load_latest_migrates_legacy_messages_array_payload() {
150        let root = std::env::temp_dir().join(format!("stakpak-checkpoint-test-{}", Uuid::new_v4()));
151        let store = CheckpointStore::new(root);
152        let session_id = Uuid::new_v4();
153
154        let legacy_payload = json!([
155            {
156                "role": "user",
157                "content": "legacy"
158            }
159        ]);
160
161        let latest_path = store
162            .root()
163            .join(session_id.to_string())
164            .join("latest.checkpoint");
165
166        if let Some(parent) = latest_path.parent() {
167            let create_dir = tokio::fs::create_dir_all(parent).await;
168            assert!(create_dir.is_ok());
169        }
170
171        let write_result = tokio::fs::write(&latest_path, legacy_payload.to_string()).await;
172        assert!(write_result.is_ok());
173
174        let loaded = store.load_latest(session_id).await;
175        assert!(loaded.is_ok());
176
177        let Some(loaded_envelope) = loaded.ok().flatten() else {
178            panic!("expected migrated checkpoint envelope");
179        };
180
181        assert_eq!(
182            loaded_envelope
183                .messages
184                .first()
185                .and_then(stakai::Message::text),
186            Some("legacy".to_string())
187        );
188    }
189
190    #[tokio::test]
191    async fn load_latest_returns_none_for_missing_session() {
192        let root = std::env::temp_dir().join(format!("stakpak-checkpoint-test-{}", Uuid::new_v4()));
193        let store = CheckpointStore::new(root);
194
195        let loaded = store.load_latest(Uuid::new_v4()).await;
196        assert!(loaded.is_ok());
197        assert!(loaded.ok().flatten().is_none());
198    }
199}