1use chrono::{DateTime, Utc};
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::{RwLock, Semaphore};
8use uuid::Uuid;
9
10pub type JobId = String;
12
13#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
15#[serde(rename_all = "snake_case")]
16pub enum JobStatus {
17 Queued,
19 Running,
21 Completed,
23 Failed,
25 Cancelled,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct JobResult {
32 pub output_directory: Option<String>,
34 pub records_generated: Option<usize>,
36 pub duration_seconds: Option<f64>,
38 pub error: Option<String>,
40 pub manifest_id: Option<String>,
42}
43
44#[derive(Debug, Clone, Serialize, Deserialize)]
46pub struct JobRequest {
47 #[serde(default)]
49 pub config: Option<String>,
50 #[serde(default)]
52 pub demo: bool,
53 #[serde(default)]
55 pub seed: Option<u64>,
56 #[serde(default)]
58 pub output_directory: Option<String>,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct JobEntry {
64 pub id: JobId,
66 pub status: JobStatus,
68 pub request: JobRequest,
70 pub submitted_at: DateTime<Utc>,
72 pub started_at: Option<DateTime<Utc>>,
74 pub completed_at: Option<DateTime<Utc>>,
76 pub result: Option<JobResult>,
78}
79
80#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct JobSummary {
83 pub id: JobId,
84 pub status: JobStatus,
85 pub submitted_at: DateTime<Utc>,
86 pub started_at: Option<DateTime<Utc>>,
87 pub completed_at: Option<DateTime<Utc>>,
88}
89
90impl From<&JobEntry> for JobSummary {
91 fn from(entry: &JobEntry) -> Self {
92 Self {
93 id: entry.id.clone(),
94 status: entry.status.clone(),
95 submitted_at: entry.submitted_at,
96 started_at: entry.started_at,
97 completed_at: entry.completed_at,
98 }
99 }
100}
101
102pub struct JobQueue {
104 jobs: RwLock<HashMap<JobId, JobEntry>>,
105 concurrency_semaphore: Arc<Semaphore>,
106 max_concurrent: usize,
107}
108
109impl JobQueue {
110 pub fn new(max_concurrent: usize) -> Self {
112 Self {
113 jobs: RwLock::new(HashMap::new()),
114 concurrency_semaphore: Arc::new(Semaphore::new(max_concurrent)),
115 max_concurrent,
116 }
117 }
118
119 pub fn max_concurrent(&self) -> usize {
121 self.max_concurrent
122 }
123
124 pub async fn submit(&self, request: JobRequest) -> JobId {
126 let id = Uuid::new_v4().to_string();
127 let entry = JobEntry {
128 id: id.clone(),
129 status: JobStatus::Queued,
130 request,
131 submitted_at: Utc::now(),
132 started_at: None,
133 completed_at: None,
134 result: None,
135 };
136
137 let mut jobs = self.jobs.write().await;
138 jobs.insert(id.clone(), entry);
139 id
140 }
141
142 pub async fn get(&self, id: &str) -> Option<JobEntry> {
144 let jobs = self.jobs.read().await;
145 jobs.get(id).cloned()
146 }
147
148 pub async fn list(&self) -> Vec<JobSummary> {
150 let jobs = self.jobs.read().await;
151 let mut summaries: Vec<_> = jobs.values().map(JobSummary::from).collect();
152 summaries.sort_by_key(|b| std::cmp::Reverse(b.submitted_at));
153 summaries
154 }
155
156 pub async fn cancel(&self, id: &str) -> bool {
158 let mut jobs = self.jobs.write().await;
159 if let Some(entry) = jobs.get_mut(id) {
160 if entry.status == JobStatus::Queued {
161 entry.status = JobStatus::Cancelled;
162 entry.completed_at = Some(Utc::now());
163 return true;
164 }
165 }
166 false
167 }
168
169 pub async fn mark_running(&self, id: &str) {
171 let mut jobs = self.jobs.write().await;
172 if let Some(entry) = jobs.get_mut(id) {
173 entry.status = JobStatus::Running;
174 entry.started_at = Some(Utc::now());
175 }
176 }
177
178 pub async fn mark_completed(&self, id: &str, result: JobResult) {
180 let mut jobs = self.jobs.write().await;
181 if let Some(entry) = jobs.get_mut(id) {
182 entry.status = JobStatus::Completed;
183 entry.completed_at = Some(Utc::now());
184 entry.result = Some(result);
185 }
186 }
187
188 pub async fn mark_failed(&self, id: &str, error: String) {
190 let mut jobs = self.jobs.write().await;
191 if let Some(entry) = jobs.get_mut(id) {
192 entry.status = JobStatus::Failed;
193 entry.completed_at = Some(Utc::now());
194 entry.result = Some(JobResult {
195 output_directory: None,
196 records_generated: None,
197 duration_seconds: None,
198 error: Some(error),
199 manifest_id: None,
200 });
201 }
202 }
203
204 pub fn semaphore(&self) -> Arc<Semaphore> {
206 Arc::clone(&self.concurrency_semaphore)
207 }
208}
209
210#[cfg(test)]
211mod tests {
212 use super::*;
213
214 #[tokio::test]
215 async fn test_submit_and_get() {
216 let queue = JobQueue::new(4);
217 let id = queue
218 .submit(JobRequest {
219 config: None,
220 demo: true,
221 seed: Some(42),
222 output_directory: None,
223 })
224 .await;
225
226 let job = queue.get(&id).await.expect("job should exist");
227 assert_eq!(job.status, JobStatus::Queued);
228 assert!(job.request.demo);
229 }
230
231 #[tokio::test]
232 async fn test_status_transitions() {
233 let queue = JobQueue::new(4);
234 let id = queue
235 .submit(JobRequest {
236 config: None,
237 demo: true,
238 seed: None,
239 output_directory: None,
240 })
241 .await;
242
243 queue.mark_running(&id).await;
245 let job = queue.get(&id).await.unwrap();
246 assert_eq!(job.status, JobStatus::Running);
247 assert!(job.started_at.is_some());
248
249 queue
251 .mark_completed(
252 &id,
253 JobResult {
254 output_directory: Some("/tmp/output".to_string()),
255 records_generated: Some(1000),
256 duration_seconds: Some(5.0),
257 error: None,
258 manifest_id: Some("run-123".to_string()),
259 },
260 )
261 .await;
262 let job = queue.get(&id).await.unwrap();
263 assert_eq!(job.status, JobStatus::Completed);
264 assert!(job.completed_at.is_some());
265 assert_eq!(job.result.unwrap().records_generated, Some(1000));
266 }
267
268 #[tokio::test]
269 async fn test_cancel_queued_job() {
270 let queue = JobQueue::new(4);
271 let id = queue
272 .submit(JobRequest {
273 config: None,
274 demo: true,
275 seed: None,
276 output_directory: None,
277 })
278 .await;
279
280 assert!(queue.cancel(&id).await);
281 let job = queue.get(&id).await.unwrap();
282 assert_eq!(job.status, JobStatus::Cancelled);
283 }
284
285 #[tokio::test]
286 async fn test_cannot_cancel_running_job() {
287 let queue = JobQueue::new(4);
288 let id = queue
289 .submit(JobRequest {
290 config: None,
291 demo: true,
292 seed: None,
293 output_directory: None,
294 })
295 .await;
296
297 queue.mark_running(&id).await;
298 assert!(!queue.cancel(&id).await); }
300
301 #[tokio::test]
302 async fn test_list_jobs() {
303 let queue = JobQueue::new(4);
304 queue
305 .submit(JobRequest {
306 config: None,
307 demo: true,
308 seed: None,
309 output_directory: None,
310 })
311 .await;
312 queue
313 .submit(JobRequest {
314 config: None,
315 demo: true,
316 seed: None,
317 output_directory: None,
318 })
319 .await;
320
321 let jobs = queue.list().await;
322 assert_eq!(jobs.len(), 2);
323 }
324
325 #[tokio::test]
326 async fn test_mark_failed() {
327 let queue = JobQueue::new(4);
328 let id = queue
329 .submit(JobRequest {
330 config: None,
331 demo: true,
332 seed: None,
333 output_directory: None,
334 })
335 .await;
336
337 queue.mark_running(&id).await;
338 queue.mark_failed(&id, "Out of memory".to_string()).await;
339
340 let job = queue.get(&id).await.unwrap();
341 assert_eq!(job.status, JobStatus::Failed);
342 assert_eq!(job.result.unwrap().error, Some("Out of memory".to_string()));
343 }
344
345 #[tokio::test]
346 async fn test_concurrency_semaphore() {
347 let queue = JobQueue::new(2);
348 let sem = queue.semaphore();
349 assert_eq!(sem.available_permits(), 2);
350
351 let _permit1 = sem.acquire().await.unwrap();
352 assert_eq!(sem.available_permits(), 1);
353
354 let _permit2 = sem.acquire().await.unwrap();
355 assert_eq!(sem.available_permits(), 0);
356 }
357}