1use crate::types::{CronJob, JobExecution, Result};
6use async_trait::async_trait;
7use std::collections::HashMap;
8use std::path::{Path, PathBuf};
9use tokio::fs;
10use tokio::io::AsyncWriteExt;
11use tokio::sync::RwLock;
12
13#[async_trait]
15pub trait CronStore: Send + Sync {
16 async fn save_job(&self, job: &CronJob) -> Result<()>;
18
19 async fn load_job(&self, id: &str) -> Result<Option<CronJob>>;
21
22 async fn delete_job(&self, id: &str) -> Result<()>;
24
25 async fn list_jobs(&self) -> Result<Vec<CronJob>>;
27
28 async fn job_exists(&self, id: &str) -> Result<bool>;
30
31 async fn find_job_by_name(&self, name: &str) -> Result<Option<CronJob>>;
33
34 async fn save_execution(&self, execution: &JobExecution) -> Result<()>;
36
37 async fn load_executions(&self, job_id: &str, limit: usize) -> Result<Vec<JobExecution>>;
39
40 async fn delete_executions(&self, job_id: &str) -> Result<()>;
42}
43
44pub struct FileCronStore {
59 jobs_file: PathBuf,
61 history_dir: PathBuf,
63}
64
65impl FileCronStore {
66 pub async fn new<P: AsRef<Path>>(workspace: P) -> Result<Self> {
68 let base_dir = workspace.as_ref().join(".a3s").join("cron");
69 let jobs_file = base_dir.join("jobs.json");
70 let history_dir = base_dir.join("history");
71
72 fs::create_dir_all(&base_dir).await?;
74 fs::create_dir_all(&history_dir).await?;
75
76 if !jobs_file.exists() {
78 let empty: Vec<CronJob> = Vec::new();
79 let json = serde_json::to_string_pretty(&empty)?;
80 fs::write(&jobs_file, json).await?;
81 }
82
83 Ok(Self {
84 jobs_file,
85 history_dir,
86 })
87 }
88
89 async fn load_all_jobs(&self) -> Result<Vec<CronJob>> {
91 let content = fs::read_to_string(&self.jobs_file).await?;
92 let jobs: Vec<CronJob> = serde_json::from_str(&content)?;
93 Ok(jobs)
94 }
95
96 async fn save_all_jobs(&self, jobs: &[CronJob]) -> Result<()> {
98 let json = serde_json::to_string_pretty(jobs)?;
99
100 let temp_path = self.jobs_file.with_extension("json.tmp");
102 let mut file = fs::File::create(&temp_path).await?;
103 file.write_all(json.as_bytes()).await?;
104 file.sync_all().await?;
105 fs::rename(&temp_path, &self.jobs_file).await?;
106
107 Ok(())
108 }
109
110 fn job_history_dir(&self, job_id: &str) -> PathBuf {
112 let safe_id = job_id.replace(['/', '\\'], "_").replace("..", "_");
114 self.history_dir.join(safe_id)
115 }
116}
117
118#[async_trait]
119impl CronStore for FileCronStore {
120 async fn save_job(&self, job: &CronJob) -> Result<()> {
121 let mut jobs = self.load_all_jobs().await?;
122
123 if let Some(existing) = jobs.iter_mut().find(|j| j.id == job.id) {
125 *existing = job.clone();
126 } else {
127 jobs.push(job.clone());
128 }
129
130 self.save_all_jobs(&jobs).await
131 }
132
133 async fn load_job(&self, id: &str) -> Result<Option<CronJob>> {
134 let jobs = self.load_all_jobs().await?;
135 Ok(jobs.into_iter().find(|j| j.id == id))
136 }
137
138 async fn delete_job(&self, id: &str) -> Result<()> {
139 let mut jobs = self.load_all_jobs().await?;
140 jobs.retain(|j| j.id != id);
141 self.save_all_jobs(&jobs).await?;
142
143 self.delete_executions(id).await?;
145
146 Ok(())
147 }
148
149 async fn list_jobs(&self) -> Result<Vec<CronJob>> {
150 self.load_all_jobs().await
151 }
152
153 async fn job_exists(&self, id: &str) -> Result<bool> {
154 let jobs = self.load_all_jobs().await?;
155 Ok(jobs.iter().any(|j| j.id == id))
156 }
157
158 async fn find_job_by_name(&self, name: &str) -> Result<Option<CronJob>> {
159 let jobs = self.load_all_jobs().await?;
160 Ok(jobs.into_iter().find(|j| j.name == name))
161 }
162
163 async fn save_execution(&self, execution: &JobExecution) -> Result<()> {
164 let job_dir = self.job_history_dir(&execution.job_id);
165 fs::create_dir_all(&job_dir).await?;
166
167 let filename = format!("{}.json", execution.started_at.timestamp_millis());
168 let path = job_dir.join(filename);
169
170 let json = serde_json::to_string_pretty(execution)?;
171 fs::write(&path, json).await?;
172
173 Ok(())
174 }
175
176 async fn load_executions(&self, job_id: &str, limit: usize) -> Result<Vec<JobExecution>> {
177 let job_dir = self.job_history_dir(job_id);
178
179 if !job_dir.exists() {
180 return Ok(Vec::new());
181 }
182
183 let mut executions = Vec::new();
184 let mut entries = fs::read_dir(&job_dir).await?;
185
186 while let Some(entry) = entries.next_entry().await? {
187 let path = entry.path();
188 if path.extension().is_some_and(|ext| ext == "json") {
189 let content = fs::read_to_string(&path).await?;
190 if let Ok(exec) = serde_json::from_str::<JobExecution>(&content) {
191 executions.push(exec);
192 }
193 }
194 }
195
196 executions.sort_by(|a, b| b.started_at.cmp(&a.started_at));
198
199 executions.truncate(limit);
201
202 Ok(executions)
203 }
204
205 async fn delete_executions(&self, job_id: &str) -> Result<()> {
206 let job_dir = self.job_history_dir(job_id);
207
208 if job_dir.exists() {
209 fs::remove_dir_all(&job_dir).await?;
210 }
211
212 Ok(())
213 }
214}
215
216pub struct MemoryCronStore {
222 jobs: RwLock<HashMap<String, CronJob>>,
223 executions: RwLock<HashMap<String, Vec<JobExecution>>>,
224}
225
226impl MemoryCronStore {
227 pub fn new() -> Self {
229 Self {
230 jobs: RwLock::new(HashMap::new()),
231 executions: RwLock::new(HashMap::new()),
232 }
233 }
234}
235
236impl Default for MemoryCronStore {
237 fn default() -> Self {
238 Self::new()
239 }
240}
241
242#[async_trait]
243impl CronStore for MemoryCronStore {
244 async fn save_job(&self, job: &CronJob) -> Result<()> {
245 let mut jobs = self.jobs.write().await;
246 jobs.insert(job.id.clone(), job.clone());
247 Ok(())
248 }
249
250 async fn load_job(&self, id: &str) -> Result<Option<CronJob>> {
251 let jobs = self.jobs.read().await;
252 Ok(jobs.get(id).cloned())
253 }
254
255 async fn delete_job(&self, id: &str) -> Result<()> {
256 let mut jobs = self.jobs.write().await;
257 jobs.remove(id);
258
259 let mut executions = self.executions.write().await;
260 executions.remove(id);
261
262 Ok(())
263 }
264
265 async fn list_jobs(&self) -> Result<Vec<CronJob>> {
266 let jobs = self.jobs.read().await;
267 Ok(jobs.values().cloned().collect())
268 }
269
270 async fn job_exists(&self, id: &str) -> Result<bool> {
271 let jobs = self.jobs.read().await;
272 Ok(jobs.contains_key(id))
273 }
274
275 async fn find_job_by_name(&self, name: &str) -> Result<Option<CronJob>> {
276 let jobs = self.jobs.read().await;
277 Ok(jobs.values().find(|j| j.name == name).cloned())
278 }
279
280 async fn save_execution(&self, execution: &JobExecution) -> Result<()> {
281 let mut executions = self.executions.write().await;
282 executions
283 .entry(execution.job_id.clone())
284 .or_default()
285 .push(execution.clone());
286 Ok(())
287 }
288
289 async fn load_executions(&self, job_id: &str, limit: usize) -> Result<Vec<JobExecution>> {
290 let executions = self.executions.read().await;
291 let mut result = executions.get(job_id).cloned().unwrap_or_default();
292 result.sort_by(|a, b| b.started_at.cmp(&a.started_at));
293 result.truncate(limit);
294 Ok(result)
295 }
296
297 async fn delete_executions(&self, job_id: &str) -> Result<()> {
298 let mut executions = self.executions.write().await;
299 executions.remove(job_id);
300 Ok(())
301 }
302}
303
304#[cfg(test)]
305mod tests {
306 use super::*;
307 use tempfile::tempdir;
308
309 #[tokio::test]
314 async fn test_memory_store_save_and_load() {
315 let store = MemoryCronStore::new();
316 let job = CronJob::new("test", "* * * * *", "echo hello");
317
318 store.save_job(&job).await.unwrap();
319
320 let loaded = store.load_job(&job.id).await.unwrap();
321 assert!(loaded.is_some());
322 assert_eq!(loaded.unwrap().name, "test");
323 }
324
325 #[tokio::test]
326 async fn test_memory_store_delete() {
327 let store = MemoryCronStore::new();
328 let job = CronJob::new("test", "* * * * *", "echo hello");
329
330 store.save_job(&job).await.unwrap();
331 assert!(store.job_exists(&job.id).await.unwrap());
332
333 store.delete_job(&job.id).await.unwrap();
334 assert!(!store.job_exists(&job.id).await.unwrap());
335 }
336
337 #[tokio::test]
338 async fn test_memory_store_list() {
339 let store = MemoryCronStore::new();
340
341 for i in 1..=3 {
342 let job = CronJob::new(format!("job-{}", i), "* * * * *", "echo");
343 store.save_job(&job).await.unwrap();
344 }
345
346 let jobs = store.list_jobs().await.unwrap();
347 assert_eq!(jobs.len(), 3);
348 }
349
350 #[tokio::test]
351 async fn test_memory_store_find_by_name() {
352 let store = MemoryCronStore::new();
353 let job = CronJob::new("unique-name", "* * * * *", "echo");
354 store.save_job(&job).await.unwrap();
355
356 let found = store.find_job_by_name("unique-name").await.unwrap();
357 assert!(found.is_some());
358 assert_eq!(found.unwrap().id, job.id);
359
360 let not_found = store.find_job_by_name("nonexistent").await.unwrap();
361 assert!(not_found.is_none());
362 }
363
364 #[tokio::test]
365 async fn test_memory_store_executions() {
366 let store = MemoryCronStore::new();
367 let job = CronJob::new("test", "* * * * *", "echo");
368 store.save_job(&job).await.unwrap();
369
370 for _ in 0..5 {
372 let exec = JobExecution::new(&job.id);
373 store.save_execution(&exec).await.unwrap();
374 }
375
376 let executions = store.load_executions(&job.id, 10).await.unwrap();
377 assert_eq!(executions.len(), 5);
378
379 let limited = store.load_executions(&job.id, 2).await.unwrap();
381 assert_eq!(limited.len(), 2);
382 }
383
384 #[tokio::test]
389 async fn test_file_store_save_and_load() {
390 let dir = tempdir().unwrap();
391 let store = FileCronStore::new(dir.path()).await.unwrap();
392
393 let job = CronJob::new("test", "* * * * *", "echo hello");
394 store.save_job(&job).await.unwrap();
395
396 let loaded = store.load_job(&job.id).await.unwrap();
397 assert!(loaded.is_some());
398 assert_eq!(loaded.unwrap().name, "test");
399 }
400
401 #[tokio::test]
402 async fn test_file_store_persistence() {
403 let dir = tempdir().unwrap();
404
405 {
407 let store = FileCronStore::new(dir.path()).await.unwrap();
408 let job = CronJob::new("persistent", "0 * * * *", "backup.sh");
409 store.save_job(&job).await.unwrap();
410 }
411
412 {
414 let store = FileCronStore::new(dir.path()).await.unwrap();
415 let jobs = store.list_jobs().await.unwrap();
416 assert_eq!(jobs.len(), 1);
417 assert_eq!(jobs[0].name, "persistent");
418 }
419 }
420
421 #[tokio::test]
422 async fn test_file_store_delete() {
423 let dir = tempdir().unwrap();
424 let store = FileCronStore::new(dir.path()).await.unwrap();
425
426 let job = CronJob::new("to-delete", "* * * * *", "echo");
427 store.save_job(&job).await.unwrap();
428
429 store.delete_job(&job.id).await.unwrap();
430
431 let loaded = store.load_job(&job.id).await.unwrap();
432 assert!(loaded.is_none());
433 }
434
435 #[tokio::test]
436 async fn test_file_store_update() {
437 let dir = tempdir().unwrap();
438 let store = FileCronStore::new(dir.path()).await.unwrap();
439
440 let mut job = CronJob::new("updatable", "* * * * *", "echo v1");
441 store.save_job(&job).await.unwrap();
442
443 job.command = "echo v2".to_string();
445 store.save_job(&job).await.unwrap();
446
447 let loaded = store.load_job(&job.id).await.unwrap().unwrap();
448 assert_eq!(loaded.command, "echo v2");
449
450 let jobs = store.list_jobs().await.unwrap();
452 assert_eq!(jobs.len(), 1);
453 }
454
455 #[tokio::test]
456 async fn test_file_store_executions() {
457 let dir = tempdir().unwrap();
458 let store = FileCronStore::new(dir.path()).await.unwrap();
459
460 let job = CronJob::new("test", "* * * * *", "echo");
461 store.save_job(&job).await.unwrap();
462
463 for _ in 0..3 {
465 let exec = JobExecution::new(&job.id);
466 store.save_execution(&exec).await.unwrap();
467 tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
468 }
469
470 let executions = store.load_executions(&job.id, 10).await.unwrap();
471 assert_eq!(executions.len(), 3);
472
473 store.delete_job(&job.id).await.unwrap();
475 let executions = store.load_executions(&job.id, 10).await.unwrap();
476 assert!(executions.is_empty());
477 }
478}