1use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::{RwLock, Semaphore};
8use uuid::Uuid;
9
10pub type JobId = String;
12
13#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
15#[serde(rename_all = "snake_case")]
16pub enum JobStatus {
17 Queued,
19 Running,
21 Completed,
23 Failed,
25 Cancelled,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct JobResult {
32 pub output_directory: Option<String>,
34 pub records_generated: Option<usize>,
36 pub duration_seconds: Option<f64>,
38 pub error: Option<String>,
40 pub manifest_id: Option<String>,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct JobRequest {
47 #[serde(default)]
49 pub config: Option<String>,
50 #[serde(default)]
52 pub demo: bool,
53 #[serde(default)]
55 pub seed: Option<u64>,
56 #[serde(default)]
58 pub output_directory: Option<String>,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct JobEntry {
64 pub id: JobId,
66 pub status: JobStatus,
68 pub request: JobRequest,
70 pub submitted_at: DateTime<Utc>,
72 pub started_at: Option<DateTime<Utc>>,
74 pub completed_at: Option<DateTime<Utc>>,
76 pub result: Option<JobResult>,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct JobSummary {
83 pub id: JobId,
84 pub status: JobStatus,
85 pub submitted_at: DateTime<Utc>,
86 pub started_at: Option<DateTime<Utc>>,
87 pub completed_at: Option<DateTime<Utc>>,
88}
89
90impl From<&JobEntry> for JobSummary {
91 fn from(entry: &JobEntry) -> Self {
92 Self {
93 id: entry.id.clone(),
94 status: entry.status.clone(),
95 submitted_at: entry.submitted_at,
96 started_at: entry.started_at,
97 completed_at: entry.completed_at,
98 }
99 }
100}
101
102pub struct JobQueue {
104 jobs: RwLock<HashMap<JobId, JobEntry>>,
105 concurrency_semaphore: Arc<Semaphore>,
106 max_concurrent: usize,
107}
108
109impl JobQueue {
110 pub fn new(max_concurrent: usize) -> Self {
112 Self {
113 jobs: RwLock::new(HashMap::new()),
114 concurrency_semaphore: Arc::new(Semaphore::new(max_concurrent)),
115 max_concurrent,
116 }
117 }
118
119 pub fn max_concurrent(&self) -> usize {
121 self.max_concurrent
122 }
123
124 pub async fn submit(&self, request: JobRequest) -> JobId {
126 let id = Uuid::new_v4().to_string();
127 let entry = JobEntry {
128 id: id.clone(),
129 status: JobStatus::Queued,
130 request,
131 submitted_at: Utc::now(),
132 started_at: None,
133 completed_at: None,
134 result: None,
135 };
136
137 let mut jobs = self.jobs.write().await;
138 jobs.insert(id.clone(), entry);
139 id
140 }
141
142 pub async fn get(&self, id: &str) -> Option<JobEntry> {
144 let jobs = self.jobs.read().await;
145 jobs.get(id).cloned()
146 }
147
148 pub async fn list(&self) -> Vec<JobSummary> {
150 let jobs = self.jobs.read().await;
151 let mut summaries: Vec<_> = jobs.values().map(JobSummary::from).collect();
152 summaries.sort_by(|a, b| b.submitted_at.cmp(&a.submitted_at));
153 summaries
154 }
155
156 pub async fn cancel(&self, id: &str) -> bool {
158 let mut jobs = self.jobs.write().await;
159 if let Some(entry) = jobs.get_mut(id) {
160 if entry.status == JobStatus::Queued {
161 entry.status = JobStatus::Cancelled;
162 entry.completed_at = Some(Utc::now());
163 return true;
164 }
165 }
166 false
167 }
168
169 pub async fn mark_running(&self, id: &str) {
171 let mut jobs = self.jobs.write().await;
172 if let Some(entry) = jobs.get_mut(id) {
173 entry.status = JobStatus::Running;
174 entry.started_at = Some(Utc::now());
175 }
176 }
177
178 pub async fn mark_completed(&self, id: &str, result: JobResult) {
180 let mut jobs = self.jobs.write().await;
181 if let Some(entry) = jobs.get_mut(id) {
182 entry.status = JobStatus::Completed;
183 entry.completed_at = Some(Utc::now());
184 entry.result = Some(result);
185 }
186 }
187
188 pub async fn mark_failed(&self, id: &str, error: String) {
190 let mut jobs = self.jobs.write().await;
191 if let Some(entry) = jobs.get_mut(id) {
192 entry.status = JobStatus::Failed;
193 entry.completed_at = Some(Utc::now());
194 entry.result = Some(JobResult {
195 output_directory: None,
196 records_generated: None,
197 duration_seconds: None,
198 error: Some(error),
199 manifest_id: None,
200 });
201 }
202 }
203
204 pub fn semaphore(&self) -> Arc<Semaphore> {
206 Arc::clone(&self.concurrency_semaphore)
207 }
208}
209
210#[cfg(test)]
211#[allow(clippy::unwrap_used)]
212mod tests {
213 use super::*;
214
215 #[tokio::test]
216 async fn test_submit_and_get() {
217 let queue = JobQueue::new(4);
218 let id = queue
219 .submit(JobRequest {
220 config: None,
221 demo: true,
222 seed: Some(42),
223 output_directory: None,
224 })
225 .await;
226
227 let job = queue.get(&id).await.expect("job should exist");
228 assert_eq!(job.status, JobStatus::Queued);
229 assert!(job.request.demo);
230 }
231
232 #[tokio::test]
233 async fn test_status_transitions() {
234 let queue = JobQueue::new(4);
235 let id = queue
236 .submit(JobRequest {
237 config: None,
238 demo: true,
239 seed: None,
240 output_directory: None,
241 })
242 .await;
243
244 queue.mark_running(&id).await;
246 let job = queue.get(&id).await.unwrap();
247 assert_eq!(job.status, JobStatus::Running);
248 assert!(job.started_at.is_some());
249
250 queue
252 .mark_completed(
253 &id,
254 JobResult {
255 output_directory: Some("/tmp/output".to_string()),
256 records_generated: Some(1000),
257 duration_seconds: Some(5.0),
258 error: None,
259 manifest_id: Some("run-123".to_string()),
260 },
261 )
262 .await;
263 let job = queue.get(&id).await.unwrap();
264 assert_eq!(job.status, JobStatus::Completed);
265 assert!(job.completed_at.is_some());
266 assert_eq!(job.result.unwrap().records_generated, Some(1000));
267 }
268
269 #[tokio::test]
270 async fn test_cancel_queued_job() {
271 let queue = JobQueue::new(4);
272 let id = queue
273 .submit(JobRequest {
274 config: None,
275 demo: true,
276 seed: None,
277 output_directory: None,
278 })
279 .await;
280
281 assert!(queue.cancel(&id).await);
282 let job = queue.get(&id).await.unwrap();
283 assert_eq!(job.status, JobStatus::Cancelled);
284 }
285
286 #[tokio::test]
287 async fn test_cannot_cancel_running_job() {
288 let queue = JobQueue::new(4);
289 let id = queue
290 .submit(JobRequest {
291 config: None,
292 demo: true,
293 seed: None,
294 output_directory: None,
295 })
296 .await;
297
298 queue.mark_running(&id).await;
299 assert!(!queue.cancel(&id).await); }
301
302 #[tokio::test]
303 async fn test_list_jobs() {
304 let queue = JobQueue::new(4);
305 queue
306 .submit(JobRequest {
307 config: None,
308 demo: true,
309 seed: None,
310 output_directory: None,
311 })
312 .await;
313 queue
314 .submit(JobRequest {
315 config: None,
316 demo: true,
317 seed: None,
318 output_directory: None,
319 })
320 .await;
321
322 let jobs = queue.list().await;
323 assert_eq!(jobs.len(), 2);
324 }
325
326 #[tokio::test]
327 async fn test_mark_failed() {
328 let queue = JobQueue::new(4);
329 let id = queue
330 .submit(JobRequest {
331 config: None,
332 demo: true,
333 seed: None,
334 output_directory: None,
335 })
336 .await;
337
338 queue.mark_running(&id).await;
339 queue.mark_failed(&id, "Out of memory".to_string()).await;
340
341 let job = queue.get(&id).await.unwrap();
342 assert_eq!(job.status, JobStatus::Failed);
343 assert_eq!(job.result.unwrap().error, Some("Out of memory".to_string()));
344 }
345
346 #[tokio::test]
347 async fn test_concurrency_semaphore() {
348 let queue = JobQueue::new(2);
349 let sem = queue.semaphore();
350 assert_eq!(sem.available_permits(), 2);
351
352 let _permit1 = sem.acquire().await.unwrap();
353 assert_eq!(sem.available_permits(), 1);
354
355 let _permit2 = sem.acquire().await.unwrap();
356 assert_eq!(sem.available_permits(), 0);
357 }
358}