Skip to main content

deepseek/agent/scheduler/
store.rs

1//! JSON persistence for [`super::Scheduler`] state.
2//!
3//! Tasks are stored at `${cache_dir}/deepseek-loop/sessions/<session_id>/tasks.json`.
4//! Writes are atomic (tempfile + rename).
5
6use std::fs;
7use std::io;
8use std::path::{Path, PathBuf};
9
10use serde::{Deserialize, Serialize};
11
12use super::Task;
13
14/// File-level wrapper so we can extend later without breaking older readers.
15#[derive(Debug, Clone, Serialize, Deserialize)]
16struct StoreFile {
17    /// Schema version; bumped on incompatible changes.
18    version: u32,
19    tasks: Vec<Task>,
20}
21
22const SCHEMA_VERSION: u32 = 1;
23
24/// Resolve the on-disk directory for `session_id`.
25pub fn session_dir(session_id: &str) -> Option<PathBuf> {
26    dirs::cache_dir().map(|d| d.join("deepseek-loop").join("sessions").join(session_id))
27}
28
29/// Load tasks for `session_id`. Returns an empty vec when no file exists.
30pub fn load(session_id: &str) -> io::Result<Vec<Task>> {
31    let Some(dir) = session_dir(session_id) else {
32        return Ok(Vec::new());
33    };
34    let path = dir.join("tasks.json");
35    if !path.exists() {
36        return Ok(Vec::new());
37    }
38    let raw = fs::read_to_string(&path)?;
39    let store: StoreFile = serde_json::from_str(&raw).map_err(io::Error::other)?;
40    if store.version != SCHEMA_VERSION {
41        // Unknown future schema → treat as empty rather than crashing.
42        return Ok(Vec::new());
43    }
44    Ok(store.tasks)
45}
46
47/// Atomically persist `tasks` for `session_id`.
48pub fn save(session_id: &str, tasks: &[Task]) -> io::Result<()> {
49    let Some(dir) = session_dir(session_id) else {
50        return Ok(()); // no cache dir on this platform → silently skip
51    };
52    save_to(&dir, tasks)
53}
54
55/// Save into an explicit directory (testable path).
56pub fn save_to(dir: &Path, tasks: &[Task]) -> io::Result<()> {
57    fs::create_dir_all(dir)?;
58    let store = StoreFile {
59        version: SCHEMA_VERSION,
60        tasks: tasks.to_vec(),
61    };
62    let json = serde_json::to_vec_pretty(&store).map_err(io::Error::other)?;
63    let mut tmp = tempfile::NamedTempFile::new_in(dir)?;
64    use std::io::Write;
65    tmp.write_all(&json)?;
66    tmp.flush()?;
67    let target = dir.join("tasks.json");
68    tmp.persist(&target).map_err(|e| e.error)?;
69    Ok(())
70}
71
72/// Load from an explicit directory (testable path).
73pub fn load_from(dir: &Path) -> io::Result<Vec<Task>> {
74    let path = dir.join("tasks.json");
75    if !path.exists() {
76        return Ok(Vec::new());
77    }
78    let raw = fs::read_to_string(&path)?;
79    let store: StoreFile = serde_json::from_str(&raw).map_err(io::Error::other)?;
80    if store.version != SCHEMA_VERSION {
81        return Ok(Vec::new());
82    }
83    Ok(store.tasks)
84}
85
86#[cfg(test)]
87mod tests {
88    use super::*;
89    use crate::agent::scheduler::{Schedule, Task, TaskId};
90    use chrono::Utc;
91
92    #[test]
93    fn round_trip() {
94        let dir = tempfile::tempdir().unwrap();
95        let task = Task {
96            id: TaskId::new(),
97            schedule: Schedule::Dynamic,
98            prompt: "test".into(),
99            recurring: false,
100            created_at: Utc::now(),
101            next_fire: Utc::now(),
102            expires_at: None,
103        };
104        save_to(dir.path(), &[task.clone()]).unwrap();
105        let loaded = load_from(dir.path()).unwrap();
106        assert_eq!(loaded.len(), 1);
107        assert_eq!(loaded[0].id.as_str(), task.id.as_str());
108        assert_eq!(loaded[0].prompt, task.prompt);
109    }
110
111    #[test]
112    fn missing_file_returns_empty() {
113        let dir = tempfile::tempdir().unwrap();
114        assert!(load_from(dir.path()).unwrap().is_empty());
115    }
116
117    #[test]
118    fn unknown_schema_version_returns_empty() {
119        let dir = tempfile::tempdir().unwrap();
120        let path = dir.path().join("tasks.json");
121        // Forge a future-version file; load_from should treat it as empty
122        // rather than panicking or surfacing a deserialization error.
123        std::fs::write(&path, r#"{"version":99,"tasks":[]}"#).unwrap();
124        assert!(load_from(dir.path()).unwrap().is_empty());
125    }
126
127    #[test]
128    fn corrupt_json_returns_error() {
129        let dir = tempfile::tempdir().unwrap();
130        let path = dir.path().join("tasks.json");
131        std::fs::write(&path, "not json at all").unwrap();
132        assert!(load_from(dir.path()).is_err());
133    }
134
135    #[test]
136    fn save_creates_directory_if_missing() {
137        let parent = tempfile::tempdir().unwrap();
138        let nested = parent.path().join("does/not/yet/exist");
139        let task = Task {
140            id: TaskId::new(),
141            schedule: Schedule::Dynamic,
142            prompt: "p".into(),
143            recurring: false,
144            created_at: Utc::now(),
145            next_fire: Utc::now(),
146            expires_at: None,
147        };
148        save_to(&nested, &[task.clone()]).unwrap();
149        let loaded = load_from(&nested).unwrap();
150        assert_eq!(loaded.len(), 1);
151        assert_eq!(loaded[0].id, task.id);
152    }
153
154    #[test]
155    fn save_is_atomic_no_tmp_leftover() {
156        // After a successful save, only `tasks.json` should be present —
157        // tempfile::persist atomically renames, so no `.tmp*` leftover.
158        let dir = tempfile::tempdir().unwrap();
159        let task = Task {
160            id: TaskId::new(),
161            schedule: Schedule::Dynamic,
162            prompt: "p".into(),
163            recurring: false,
164            created_at: Utc::now(),
165            next_fire: Utc::now(),
166            expires_at: None,
167        };
168        save_to(dir.path(), &[task]).unwrap();
169
170        let entries: Vec<String> = std::fs::read_dir(dir.path())
171            .unwrap()
172            .map(|e| e.unwrap().file_name().to_string_lossy().to_string())
173            .collect();
174        assert_eq!(entries, vec!["tasks.json"], "got entries: {entries:?}");
175    }
176
177    #[test]
178    fn round_trip_preserves_schedule_kind() {
179        // Each Schedule variant must survive serde round-trip.
180        let dir = tempfile::tempdir().unwrap();
181        let now = Utc::now();
182        let cron = crate::agent::scheduler::CronExpr::parse("*/15 * * * *").unwrap();
183        let tasks = vec![
184            Task {
185                id: TaskId::new(),
186                schedule: Schedule::Cron(Box::new(cron)),
187                prompt: "cron".into(),
188                recurring: true,
189                created_at: now,
190                next_fire: now,
191                expires_at: Some(now),
192            },
193            Task {
194                id: TaskId::new(),
195                schedule: Schedule::Once { at: now },
196                prompt: "once".into(),
197                recurring: false,
198                created_at: now,
199                next_fire: now,
200                expires_at: None,
201            },
202            Task {
203                id: TaskId::new(),
204                schedule: Schedule::Dynamic,
205                prompt: "dynamic".into(),
206                recurring: true,
207                created_at: now,
208                next_fire: now,
209                expires_at: Some(now),
210            },
211        ];
212        save_to(dir.path(), &tasks).unwrap();
213        let loaded = load_from(dir.path()).unwrap();
214        assert_eq!(loaded.len(), 3);
215        let kinds: Vec<&'static str> = loaded
216            .iter()
217            .map(|t| match &t.schedule {
218                Schedule::Cron(_) => "cron",
219                Schedule::Once { .. } => "once",
220                Schedule::Dynamic => "dynamic",
221            })
222            .collect();
223        assert_eq!(kinds, vec!["cron", "once", "dynamic"]);
224    }
225
226    #[test]
227    fn save_overwrites_previous_file() {
228        let dir = tempfile::tempdir().unwrap();
229        let task1 = Task {
230            id: TaskId::from_raw("FIRST"),
231            schedule: Schedule::Dynamic,
232            prompt: "v1".into(),
233            recurring: false,
234            created_at: Utc::now(),
235            next_fire: Utc::now(),
236            expires_at: None,
237        };
238        save_to(dir.path(), &[task1]).unwrap();
239        let task2 = Task {
240            id: TaskId::from_raw("SECOND"),
241            schedule: Schedule::Dynamic,
242            prompt: "v2".into(),
243            recurring: false,
244            created_at: Utc::now(),
245            next_fire: Utc::now(),
246            expires_at: None,
247        };
248        save_to(dir.path(), &[task2]).unwrap();
249        let loaded = load_from(dir.path()).unwrap();
250        assert_eq!(loaded.len(), 1);
251        assert_eq!(loaded[0].id.as_str(), "SECOND");
252    }
253}