Skip to main content

a3s_cron/
store.rs

1//! Cron job persistence layer
2//!
3//! Provides pluggable storage backends for cron jobs and execution history.
4
5use crate::types::{CronJob, JobExecution, Result};
6use async_trait::async_trait;
7use std::collections::HashMap;
8use std::path::{Path, PathBuf};
9use tokio::fs;
10use tokio::io::AsyncWriteExt;
11use tokio::sync::RwLock;
12
13/// Cron storage trait
14#[async_trait]
15pub trait CronStore: Send + Sync {
16    /// Save a job
17    async fn save_job(&self, job: &CronJob) -> Result<()>;
18
19    /// Load a job by ID
20    async fn load_job(&self, id: &str) -> Result<Option<CronJob>>;
21
22    /// Delete a job
23    async fn delete_job(&self, id: &str) -> Result<()>;
24
25    /// List all jobs
26    async fn list_jobs(&self) -> Result<Vec<CronJob>>;
27
28    /// Check if a job exists
29    async fn job_exists(&self, id: &str) -> Result<bool>;
30
31    /// Find a job by name
32    async fn find_job_by_name(&self, name: &str) -> Result<Option<CronJob>>;
33
34    /// Save an execution record
35    async fn save_execution(&self, execution: &JobExecution) -> Result<()>;
36
37    /// Load execution history for a job
38    async fn load_executions(&self, job_id: &str, limit: usize) -> Result<Vec<JobExecution>>;
39
40    /// Delete all executions for a job
41    async fn delete_executions(&self, job_id: &str) -> Result<()>;
42}
43
44// ============================================================================
45// File-based Store
46// ============================================================================
47
48/// File-based cron store
49///
50/// Stores jobs and executions as JSON files:
51/// ```text
52/// .a3s/cron/
53///   jobs.json           # All job definitions
54///   history/
55///     {job-id}/
56///       {timestamp}.json  # Execution records
57/// ```
58pub struct FileCronStore {
59    /// Jobs file path
60    jobs_file: PathBuf,
61    /// History directory path
62    history_dir: PathBuf,
63}
64
65impl FileCronStore {
66    /// Create a new file-based store
67    pub async fn new<P: AsRef<Path>>(workspace: P) -> Result<Self> {
68        let base_dir = workspace.as_ref().join(".a3s").join("cron");
69        let jobs_file = base_dir.join("jobs.json");
70        let history_dir = base_dir.join("history");
71
72        // Create directories
73        fs::create_dir_all(&base_dir).await?;
74        fs::create_dir_all(&history_dir).await?;
75
76        // Initialize empty jobs file if it doesn't exist
77        if !jobs_file.exists() {
78            let empty: Vec<CronJob> = Vec::new();
79            let json = serde_json::to_string_pretty(&empty)?;
80            fs::write(&jobs_file, json).await?;
81        }
82
83        Ok(Self {
84            jobs_file,
85            history_dir,
86        })
87    }
88
89    /// Load all jobs from file
90    async fn load_all_jobs(&self) -> Result<Vec<CronJob>> {
91        let content = fs::read_to_string(&self.jobs_file).await?;
92        let jobs: Vec<CronJob> = serde_json::from_str(&content)?;
93        Ok(jobs)
94    }
95
96    /// Save all jobs to file
97    async fn save_all_jobs(&self, jobs: &[CronJob]) -> Result<()> {
98        let json = serde_json::to_string_pretty(jobs)?;
99
100        // Write atomically
101        let temp_path = self.jobs_file.with_extension("json.tmp");
102        let mut file = fs::File::create(&temp_path).await?;
103        file.write_all(json.as_bytes()).await?;
104        file.sync_all().await?;
105        fs::rename(&temp_path, &self.jobs_file).await?;
106
107        Ok(())
108    }
109
110    /// Get the history directory for a job
111    fn job_history_dir(&self, job_id: &str) -> PathBuf {
112        // Sanitize job ID
113        let safe_id = job_id.replace(['/', '\\'], "_").replace("..", "_");
114        self.history_dir.join(safe_id)
115    }
116}
117
118#[async_trait]
119impl CronStore for FileCronStore {
120    async fn save_job(&self, job: &CronJob) -> Result<()> {
121        let mut jobs = self.load_all_jobs().await?;
122
123        // Update or insert
124        if let Some(existing) = jobs.iter_mut().find(|j| j.id == job.id) {
125            *existing = job.clone();
126        } else {
127            jobs.push(job.clone());
128        }
129
130        self.save_all_jobs(&jobs).await
131    }
132
133    async fn load_job(&self, id: &str) -> Result<Option<CronJob>> {
134        let jobs = self.load_all_jobs().await?;
135        Ok(jobs.into_iter().find(|j| j.id == id))
136    }
137
138    async fn delete_job(&self, id: &str) -> Result<()> {
139        let mut jobs = self.load_all_jobs().await?;
140        jobs.retain(|j| j.id != id);
141        self.save_all_jobs(&jobs).await?;
142
143        // Also delete execution history
144        self.delete_executions(id).await?;
145
146        Ok(())
147    }
148
149    async fn list_jobs(&self) -> Result<Vec<CronJob>> {
150        self.load_all_jobs().await
151    }
152
153    async fn job_exists(&self, id: &str) -> Result<bool> {
154        let jobs = self.load_all_jobs().await?;
155        Ok(jobs.iter().any(|j| j.id == id))
156    }
157
158    async fn find_job_by_name(&self, name: &str) -> Result<Option<CronJob>> {
159        let jobs = self.load_all_jobs().await?;
160        Ok(jobs.into_iter().find(|j| j.name == name))
161    }
162
163    async fn save_execution(&self, execution: &JobExecution) -> Result<()> {
164        let job_dir = self.job_history_dir(&execution.job_id);
165        fs::create_dir_all(&job_dir).await?;
166
167        let filename = format!("{}.json", execution.started_at.timestamp_millis());
168        let path = job_dir.join(filename);
169
170        let json = serde_json::to_string_pretty(execution)?;
171        fs::write(&path, json).await?;
172
173        Ok(())
174    }
175
176    async fn load_executions(&self, job_id: &str, limit: usize) -> Result<Vec<JobExecution>> {
177        let job_dir = self.job_history_dir(job_id);
178
179        if !job_dir.exists() {
180            return Ok(Vec::new());
181        }
182
183        let mut executions = Vec::new();
184        let mut entries = fs::read_dir(&job_dir).await?;
185
186        while let Some(entry) = entries.next_entry().await? {
187            let path = entry.path();
188            if path.extension().is_some_and(|ext| ext == "json") {
189                let content = fs::read_to_string(&path).await?;
190                if let Ok(exec) = serde_json::from_str::<JobExecution>(&content) {
191                    executions.push(exec);
192                }
193            }
194        }
195
196        // Sort by start time descending (most recent first)
197        executions.sort_by(|a, b| b.started_at.cmp(&a.started_at));
198
199        // Limit results
200        executions.truncate(limit);
201
202        Ok(executions)
203    }
204
205    async fn delete_executions(&self, job_id: &str) -> Result<()> {
206        let job_dir = self.job_history_dir(job_id);
207
208        if job_dir.exists() {
209            fs::remove_dir_all(&job_dir).await?;
210        }
211
212        Ok(())
213    }
214}
215
216// ============================================================================
217// In-Memory Store (for testing)
218// ============================================================================
219
220/// In-memory cron store for testing
221pub struct MemoryCronStore {
222    jobs: RwLock<HashMap<String, CronJob>>,
223    executions: RwLock<HashMap<String, Vec<JobExecution>>>,
224}
225
226impl MemoryCronStore {
227    /// Create a new in-memory store
228    pub fn new() -> Self {
229        Self {
230            jobs: RwLock::new(HashMap::new()),
231            executions: RwLock::new(HashMap::new()),
232        }
233    }
234}
235
236impl Default for MemoryCronStore {
237    fn default() -> Self {
238        Self::new()
239    }
240}
241
242#[async_trait]
243impl CronStore for MemoryCronStore {
244    async fn save_job(&self, job: &CronJob) -> Result<()> {
245        let mut jobs = self.jobs.write().await;
246        jobs.insert(job.id.clone(), job.clone());
247        Ok(())
248    }
249
250    async fn load_job(&self, id: &str) -> Result<Option<CronJob>> {
251        let jobs = self.jobs.read().await;
252        Ok(jobs.get(id).cloned())
253    }
254
255    async fn delete_job(&self, id: &str) -> Result<()> {
256        let mut jobs = self.jobs.write().await;
257        jobs.remove(id);
258
259        let mut executions = self.executions.write().await;
260        executions.remove(id);
261
262        Ok(())
263    }
264
265    async fn list_jobs(&self) -> Result<Vec<CronJob>> {
266        let jobs = self.jobs.read().await;
267        Ok(jobs.values().cloned().collect())
268    }
269
270    async fn job_exists(&self, id: &str) -> Result<bool> {
271        let jobs = self.jobs.read().await;
272        Ok(jobs.contains_key(id))
273    }
274
275    async fn find_job_by_name(&self, name: &str) -> Result<Option<CronJob>> {
276        let jobs = self.jobs.read().await;
277        Ok(jobs.values().find(|j| j.name == name).cloned())
278    }
279
280    async fn save_execution(&self, execution: &JobExecution) -> Result<()> {
281        let mut executions = self.executions.write().await;
282        executions
283            .entry(execution.job_id.clone())
284            .or_default()
285            .push(execution.clone());
286        Ok(())
287    }
288
289    async fn load_executions(&self, job_id: &str, limit: usize) -> Result<Vec<JobExecution>> {
290        let executions = self.executions.read().await;
291        let mut result = executions.get(job_id).cloned().unwrap_or_default();
292        result.sort_by(|a, b| b.started_at.cmp(&a.started_at));
293        result.truncate(limit);
294        Ok(result)
295    }
296
297    async fn delete_executions(&self, job_id: &str) -> Result<()> {
298        let mut executions = self.executions.write().await;
299        executions.remove(job_id);
300        Ok(())
301    }
302}
303
304#[cfg(test)]
305mod tests {
306    use super::*;
307    use tempfile::tempdir;
308
309    // ========================================================================
310    // MemoryCronStore Tests
311    // ========================================================================
312
313    #[tokio::test]
314    async fn test_memory_store_save_and_load() {
315        let store = MemoryCronStore::new();
316        let job = CronJob::new("test", "* * * * *", "echo hello");
317
318        store.save_job(&job).await.unwrap();
319
320        let loaded = store.load_job(&job.id).await.unwrap();
321        assert!(loaded.is_some());
322        assert_eq!(loaded.unwrap().name, "test");
323    }
324
325    #[tokio::test]
326    async fn test_memory_store_delete() {
327        let store = MemoryCronStore::new();
328        let job = CronJob::new("test", "* * * * *", "echo hello");
329
330        store.save_job(&job).await.unwrap();
331        assert!(store.job_exists(&job.id).await.unwrap());
332
333        store.delete_job(&job.id).await.unwrap();
334        assert!(!store.job_exists(&job.id).await.unwrap());
335    }
336
337    #[tokio::test]
338    async fn test_memory_store_list() {
339        let store = MemoryCronStore::new();
340
341        for i in 1..=3 {
342            let job = CronJob::new(format!("job-{}", i), "* * * * *", "echo");
343            store.save_job(&job).await.unwrap();
344        }
345
346        let jobs = store.list_jobs().await.unwrap();
347        assert_eq!(jobs.len(), 3);
348    }
349
350    #[tokio::test]
351    async fn test_memory_store_find_by_name() {
352        let store = MemoryCronStore::new();
353        let job = CronJob::new("unique-name", "* * * * *", "echo");
354        store.save_job(&job).await.unwrap();
355
356        let found = store.find_job_by_name("unique-name").await.unwrap();
357        assert!(found.is_some());
358        assert_eq!(found.unwrap().id, job.id);
359
360        let not_found = store.find_job_by_name("nonexistent").await.unwrap();
361        assert!(not_found.is_none());
362    }
363
364    #[tokio::test]
365    async fn test_memory_store_executions() {
366        let store = MemoryCronStore::new();
367        let job = CronJob::new("test", "* * * * *", "echo");
368        store.save_job(&job).await.unwrap();
369
370        // Save some executions
371        for _ in 0..5 {
372            let exec = JobExecution::new(&job.id);
373            store.save_execution(&exec).await.unwrap();
374        }
375
376        let executions = store.load_executions(&job.id, 10).await.unwrap();
377        assert_eq!(executions.len(), 5);
378
379        // Test limit
380        let limited = store.load_executions(&job.id, 2).await.unwrap();
381        assert_eq!(limited.len(), 2);
382    }
383
384    // ========================================================================
385    // FileCronStore Tests
386    // ========================================================================
387
388    #[tokio::test]
389    async fn test_file_store_save_and_load() {
390        let dir = tempdir().unwrap();
391        let store = FileCronStore::new(dir.path()).await.unwrap();
392
393        let job = CronJob::new("test", "* * * * *", "echo hello");
394        store.save_job(&job).await.unwrap();
395
396        let loaded = store.load_job(&job.id).await.unwrap();
397        assert!(loaded.is_some());
398        assert_eq!(loaded.unwrap().name, "test");
399    }
400
401    #[tokio::test]
402    async fn test_file_store_persistence() {
403        let dir = tempdir().unwrap();
404
405        // Create store and save job
406        {
407            let store = FileCronStore::new(dir.path()).await.unwrap();
408            let job = CronJob::new("persistent", "0 * * * *", "backup.sh");
409            store.save_job(&job).await.unwrap();
410        }
411
412        // Create new store instance and verify job persists
413        {
414            let store = FileCronStore::new(dir.path()).await.unwrap();
415            let jobs = store.list_jobs().await.unwrap();
416            assert_eq!(jobs.len(), 1);
417            assert_eq!(jobs[0].name, "persistent");
418        }
419    }
420
421    #[tokio::test]
422    async fn test_file_store_delete() {
423        let dir = tempdir().unwrap();
424        let store = FileCronStore::new(dir.path()).await.unwrap();
425
426        let job = CronJob::new("to-delete", "* * * * *", "echo");
427        store.save_job(&job).await.unwrap();
428
429        store.delete_job(&job.id).await.unwrap();
430
431        let loaded = store.load_job(&job.id).await.unwrap();
432        assert!(loaded.is_none());
433    }
434
435    #[tokio::test]
436    async fn test_file_store_update() {
437        let dir = tempdir().unwrap();
438        let store = FileCronStore::new(dir.path()).await.unwrap();
439
440        let mut job = CronJob::new("updatable", "* * * * *", "echo v1");
441        store.save_job(&job).await.unwrap();
442
443        // Update the job
444        job.command = "echo v2".to_string();
445        store.save_job(&job).await.unwrap();
446
447        let loaded = store.load_job(&job.id).await.unwrap().unwrap();
448        assert_eq!(loaded.command, "echo v2");
449
450        // Should still be only one job
451        let jobs = store.list_jobs().await.unwrap();
452        assert_eq!(jobs.len(), 1);
453    }
454
455    #[tokio::test]
456    async fn test_file_store_executions() {
457        let dir = tempdir().unwrap();
458        let store = FileCronStore::new(dir.path()).await.unwrap();
459
460        let job = CronJob::new("test", "* * * * *", "echo");
461        store.save_job(&job).await.unwrap();
462
463        // Save executions
464        for _ in 0..3 {
465            let exec = JobExecution::new(&job.id);
466            store.save_execution(&exec).await.unwrap();
467            tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
468        }
469
470        let executions = store.load_executions(&job.id, 10).await.unwrap();
471        assert_eq!(executions.len(), 3);
472
473        // Delete job should also delete executions
474        store.delete_job(&job.id).await.unwrap();
475        let executions = store.load_executions(&job.id, 10).await.unwrap();
476        assert!(executions.is_empty());
477    }
478}