Skip to main content

datasynth_server/jobs/
queue.rs

1//! Job queue for async generation with submit/poll/cancel pattern.
2
3use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::{RwLock, Semaphore};
8use uuid::Uuid;
9
10/// Unique identifier for a job.
11pub type JobId = String;
12
13/// Status of a job.
14#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
15#[serde(rename_all = "snake_case")]
16pub enum JobStatus {
17    /// Job is waiting to be processed.
18    Queued,
19    /// Job is currently being processed.
20    Running,
21    /// Job has completed successfully.
22    Completed,
23    /// Job has failed.
24    Failed,
25    /// Job was cancelled.
26    Cancelled,
27}
28
29/// Result of a completed job.
30#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct JobResult {
32    /// Output directory where results were written.
33    pub output_directory: Option<String>,
34    /// Number of records generated.
35    pub records_generated: Option<usize>,
36    /// Duration in seconds.
37    pub duration_seconds: Option<f64>,
38    /// Error message if failed.
39    pub error: Option<String>,
40    /// Run manifest ID if available.
41    pub manifest_id: Option<String>,
42}
43
44/// Request to create a new job.
45#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct JobRequest {
47    /// Configuration YAML or JSON as a string.
48    #[serde(default)]
49    pub config: Option<String>,
50    /// Use demo preset if no config specified.
51    #[serde(default)]
52    pub demo: bool,
53    /// Random seed.
54    #[serde(default)]
55    pub seed: Option<u64>,
56    /// Output directory override.
57    #[serde(default)]
58    pub output_directory: Option<String>,
59}
60
61/// A job entry in the queue.
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct JobEntry {
64    /// Unique job ID.
65    pub id: JobId,
66    /// Current status.
67    pub status: JobStatus,
68    /// Job request parameters.
69    pub request: JobRequest,
70    /// When the job was submitted.
71    pub submitted_at: DateTime<Utc>,
72    /// When processing started.
73    pub started_at: Option<DateTime<Utc>>,
74    /// When the job completed/failed.
75    pub completed_at: Option<DateTime<Utc>>,
76    /// Job result (available when completed or failed).
77    pub result: Option<JobResult>,
78}
79
80/// Summary view of a job (for listing).
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct JobSummary {
83    pub id: JobId,
84    pub status: JobStatus,
85    pub submitted_at: DateTime<Utc>,
86    pub started_at: Option<DateTime<Utc>>,
87    pub completed_at: Option<DateTime<Utc>>,
88}
89
90impl From<&JobEntry> for JobSummary {
91    fn from(entry: &JobEntry) -> Self {
92        Self {
93            id: entry.id.clone(),
94            status: entry.status.clone(),
95            submitted_at: entry.submitted_at,
96            started_at: entry.started_at,
97            completed_at: entry.completed_at,
98        }
99    }
100}
101
102/// In-memory job queue with concurrency control.
103pub struct JobQueue {
104    jobs: RwLock<HashMap<JobId, JobEntry>>,
105    concurrency_semaphore: Arc<Semaphore>,
106    max_concurrent: usize,
107}
108
109impl JobQueue {
110    /// Creates a new job queue with the specified concurrency limit.
111    pub fn new(max_concurrent: usize) -> Self {
112        Self {
113            jobs: RwLock::new(HashMap::new()),
114            concurrency_semaphore: Arc::new(Semaphore::new(max_concurrent)),
115            max_concurrent,
116        }
117    }
118
119    /// Returns the max concurrent jobs setting.
120    pub fn max_concurrent(&self) -> usize {
121        self.max_concurrent
122    }
123
124    /// Submits a new job and returns its ID.
125    pub async fn submit(&self, request: JobRequest) -> JobId {
126        let id = Uuid::new_v4().to_string();
127        let entry = JobEntry {
128            id: id.clone(),
129            status: JobStatus::Queued,
130            request,
131            submitted_at: Utc::now(),
132            started_at: None,
133            completed_at: None,
134            result: None,
135        };
136
137        let mut jobs = self.jobs.write().await;
138        jobs.insert(id.clone(), entry);
139        id
140    }
141
142    /// Gets the current state of a job.
143    pub async fn get(&self, id: &str) -> Option<JobEntry> {
144        let jobs = self.jobs.read().await;
145        jobs.get(id).cloned()
146    }
147
148    /// Lists all jobs.
149    pub async fn list(&self) -> Vec<JobSummary> {
150        let jobs = self.jobs.read().await;
151        let mut summaries: Vec<_> = jobs.values().map(JobSummary::from).collect();
152        summaries.sort_by(|a, b| b.submitted_at.cmp(&a.submitted_at));
153        summaries
154    }
155
156    /// Attempts to cancel a queued job. Returns true if cancelled.
157    pub async fn cancel(&self, id: &str) -> bool {
158        let mut jobs = self.jobs.write().await;
159        if let Some(entry) = jobs.get_mut(id) {
160            if entry.status == JobStatus::Queued {
161                entry.status = JobStatus::Cancelled;
162                entry.completed_at = Some(Utc::now());
163                return true;
164            }
165        }
166        false
167    }
168
169    /// Marks a job as running.
170    pub async fn mark_running(&self, id: &str) {
171        let mut jobs = self.jobs.write().await;
172        if let Some(entry) = jobs.get_mut(id) {
173            entry.status = JobStatus::Running;
174            entry.started_at = Some(Utc::now());
175        }
176    }
177
178    /// Marks a job as completed.
179    pub async fn mark_completed(&self, id: &str, result: JobResult) {
180        let mut jobs = self.jobs.write().await;
181        if let Some(entry) = jobs.get_mut(id) {
182            entry.status = JobStatus::Completed;
183            entry.completed_at = Some(Utc::now());
184            entry.result = Some(result);
185        }
186    }
187
188    /// Marks a job as failed.
189    pub async fn mark_failed(&self, id: &str, error: String) {
190        let mut jobs = self.jobs.write().await;
191        if let Some(entry) = jobs.get_mut(id) {
192            entry.status = JobStatus::Failed;
193            entry.completed_at = Some(Utc::now());
194            entry.result = Some(JobResult {
195                output_directory: None,
196                records_generated: None,
197                duration_seconds: None,
198                error: Some(error),
199                manifest_id: None,
200            });
201        }
202    }
203
204    /// Returns a clone of the concurrency semaphore for job execution.
205    pub fn semaphore(&self) -> Arc<Semaphore> {
206        Arc::clone(&self.concurrency_semaphore)
207    }
208}
209
210#[cfg(test)]
211#[allow(clippy::unwrap_used)]
212mod tests {
213    use super::*;
214
215    #[tokio::test]
216    async fn test_submit_and_get() {
217        let queue = JobQueue::new(4);
218        let id = queue
219            .submit(JobRequest {
220                config: None,
221                demo: true,
222                seed: Some(42),
223                output_directory: None,
224            })
225            .await;
226
227        let job = queue.get(&id).await.expect("job should exist");
228        assert_eq!(job.status, JobStatus::Queued);
229        assert!(job.request.demo);
230    }
231
232    #[tokio::test]
233    async fn test_status_transitions() {
234        let queue = JobQueue::new(4);
235        let id = queue
236            .submit(JobRequest {
237                config: None,
238                demo: true,
239                seed: None,
240                output_directory: None,
241            })
242            .await;
243
244        // Queued -> Running
245        queue.mark_running(&id).await;
246        let job = queue.get(&id).await.unwrap();
247        assert_eq!(job.status, JobStatus::Running);
248        assert!(job.started_at.is_some());
249
250        // Running -> Completed
251        queue
252            .mark_completed(
253                &id,
254                JobResult {
255                    output_directory: Some("/tmp/output".to_string()),
256                    records_generated: Some(1000),
257                    duration_seconds: Some(5.0),
258                    error: None,
259                    manifest_id: Some("run-123".to_string()),
260                },
261            )
262            .await;
263        let job = queue.get(&id).await.unwrap();
264        assert_eq!(job.status, JobStatus::Completed);
265        assert!(job.completed_at.is_some());
266        assert_eq!(job.result.unwrap().records_generated, Some(1000));
267    }
268
269    #[tokio::test]
270    async fn test_cancel_queued_job() {
271        let queue = JobQueue::new(4);
272        let id = queue
273            .submit(JobRequest {
274                config: None,
275                demo: true,
276                seed: None,
277                output_directory: None,
278            })
279            .await;
280
281        assert!(queue.cancel(&id).await);
282        let job = queue.get(&id).await.unwrap();
283        assert_eq!(job.status, JobStatus::Cancelled);
284    }
285
286    #[tokio::test]
287    async fn test_cannot_cancel_running_job() {
288        let queue = JobQueue::new(4);
289        let id = queue
290            .submit(JobRequest {
291                config: None,
292                demo: true,
293                seed: None,
294                output_directory: None,
295            })
296            .await;
297
298        queue.mark_running(&id).await;
299        assert!(!queue.cancel(&id).await); // Can't cancel running
300    }
301
302    #[tokio::test]
303    async fn test_list_jobs() {
304        let queue = JobQueue::new(4);
305        queue
306            .submit(JobRequest {
307                config: None,
308                demo: true,
309                seed: None,
310                output_directory: None,
311            })
312            .await;
313        queue
314            .submit(JobRequest {
315                config: None,
316                demo: true,
317                seed: None,
318                output_directory: None,
319            })
320            .await;
321
322        let jobs = queue.list().await;
323        assert_eq!(jobs.len(), 2);
324    }
325
326    #[tokio::test]
327    async fn test_mark_failed() {
328        let queue = JobQueue::new(4);
329        let id = queue
330            .submit(JobRequest {
331                config: None,
332                demo: true,
333                seed: None,
334                output_directory: None,
335            })
336            .await;
337
338        queue.mark_running(&id).await;
339        queue.mark_failed(&id, "Out of memory".to_string()).await;
340
341        let job = queue.get(&id).await.unwrap();
342        assert_eq!(job.status, JobStatus::Failed);
343        assert_eq!(job.result.unwrap().error, Some("Out of memory".to_string()));
344    }
345
346    #[tokio::test]
347    async fn test_concurrency_semaphore() {
348        let queue = JobQueue::new(2);
349        let sem = queue.semaphore();
350        assert_eq!(sem.available_permits(), 2);
351
352        let _permit1 = sem.acquire().await.unwrap();
353        assert_eq!(sem.available_permits(), 1);
354
355        let _permit2 = sem.acquire().await.unwrap();
356        assert_eq!(sem.available_permits(), 0);
357    }
358}