1use chrono::{DateTime, Utc};
2use forge_core::job::{JobPriority, JobStatus};
3use uuid::Uuid;
4
5#[derive(Debug, Clone)]
7pub struct JobRecord {
8 pub id: Uuid,
10 pub job_type: String,
12 pub input: serde_json::Value,
14 pub output: Option<serde_json::Value>,
16 pub job_context: serde_json::Value,
18 pub status: JobStatus,
20 pub priority: i32,
22 pub attempts: i32,
24 pub max_attempts: i32,
26 pub last_error: Option<String>,
28 pub worker_capability: Option<String>,
30 pub worker_id: Option<Uuid>,
32 pub idempotency_key: Option<String>,
34 pub scheduled_at: DateTime<Utc>,
36 pub created_at: DateTime<Utc>,
38 pub claimed_at: Option<DateTime<Utc>>,
40 pub started_at: Option<DateTime<Utc>>,
42 pub completed_at: Option<DateTime<Utc>>,
44 pub failed_at: Option<DateTime<Utc>>,
46 pub last_heartbeat: Option<DateTime<Utc>>,
48 pub cancel_requested_at: Option<DateTime<Utc>>,
50 pub cancelled_at: Option<DateTime<Utc>>,
52 pub cancel_reason: Option<String>,
54}
55
56impl JobRecord {
57 pub fn new(
59 job_type: impl Into<String>,
60 input: serde_json::Value,
61 priority: JobPriority,
62 max_attempts: i32,
63 ) -> Self {
64 Self {
65 id: Uuid::new_v4(),
66 job_type: job_type.into(),
67 input,
68 output: None,
69 job_context: serde_json::json!({}),
70 status: JobStatus::Pending,
71 priority: priority.as_i32(),
72 attempts: 0,
73 max_attempts,
74 last_error: None,
75 worker_capability: None,
76 worker_id: None,
77 idempotency_key: None,
78 scheduled_at: Utc::now(),
79 created_at: Utc::now(),
80 claimed_at: None,
81 started_at: None,
82 completed_at: None,
83 failed_at: None,
84 last_heartbeat: None,
85 cancel_requested_at: None,
86 cancelled_at: None,
87 cancel_reason: None,
88 }
89 }
90
91 pub fn with_capability(mut self, capability: impl Into<String>) -> Self {
93 self.worker_capability = Some(capability.into());
94 self
95 }
96
97 pub fn with_scheduled_at(mut self, at: DateTime<Utc>) -> Self {
99 self.scheduled_at = at;
100 self
101 }
102
103 pub fn with_idempotency_key(mut self, key: impl Into<String>) -> Self {
105 self.idempotency_key = Some(key.into());
106 self
107 }
108}
109
110#[derive(Clone)]
112pub struct JobQueue {
113 pool: sqlx::PgPool,
114}
115
116impl JobQueue {
117 pub fn new(pool: sqlx::PgPool) -> Self {
119 Self { pool }
120 }
121
122 pub async fn enqueue(&self, job: JobRecord) -> Result<Uuid, sqlx::Error> {
124 if let Some(ref key) = job.idempotency_key {
126 let existing: Option<(Uuid,)> = sqlx::query_as(
127 r#"
128 SELECT id FROM forge_jobs
129 WHERE idempotency_key = $1
130 AND status NOT IN ('completed', 'failed', 'dead_letter', 'cancelled')
131 "#,
132 )
133 .bind(key)
134 .fetch_optional(&self.pool)
135 .await?;
136
137 if let Some((id,)) = existing {
138 return Ok(id); }
140 }
141
142 sqlx::query(
143 r#"
144 INSERT INTO forge_jobs (
145 id, job_type, input, job_context, status, priority, attempts, max_attempts,
146 worker_capability, idempotency_key, scheduled_at, created_at
147 ) VALUES (
148 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12
149 )
150 "#,
151 )
152 .bind(job.id)
153 .bind(&job.job_type)
154 .bind(&job.input)
155 .bind(&job.job_context)
156 .bind(job.status.as_str())
157 .bind(job.priority)
158 .bind(job.attempts)
159 .bind(job.max_attempts)
160 .bind(&job.worker_capability)
161 .bind(&job.idempotency_key)
162 .bind(job.scheduled_at)
163 .bind(job.created_at)
164 .execute(&self.pool)
165 .await?;
166
167 Ok(job.id)
168 }
169
170 pub async fn claim(
172 &self,
173 worker_id: Uuid,
174 capabilities: &[String],
175 limit: i32,
176 ) -> Result<Vec<JobRecord>, sqlx::Error> {
177 let rows = sqlx::query(
178 r#"
179 WITH claimable AS (
180 SELECT id
181 FROM forge_jobs
182 WHERE status = 'pending'
183 AND scheduled_at <= NOW()
184 AND (worker_capability = ANY($2) OR worker_capability IS NULL)
185 ORDER BY priority DESC, scheduled_at ASC
186 LIMIT $3
187 FOR UPDATE SKIP LOCKED
188 )
189 UPDATE forge_jobs
190 SET
191 status = 'claimed',
192 worker_id = $1,
193 claimed_at = NOW(),
194 attempts = attempts + 1
195 WHERE id IN (SELECT id FROM claimable)
196 RETURNING
197 id, job_type, input, output, job_context, status, priority,
198 attempts, max_attempts, last_error, worker_capability,
199 worker_id, idempotency_key, scheduled_at, created_at,
200 claimed_at, started_at, completed_at, failed_at, last_heartbeat,
201 cancel_requested_at, cancelled_at, cancel_reason
202 "#,
203 )
204 .bind(worker_id)
205 .bind(capabilities)
206 .bind(limit)
207 .fetch_all(&self.pool)
208 .await?;
209
210 let jobs = rows
211 .iter()
212 .map(|row| {
213 use sqlx::Row;
214 JobRecord {
215 id: row.get("id"),
216 job_type: row.get("job_type"),
217 input: row.get("input"),
218 output: row.get("output"),
219 job_context: row.get("job_context"),
220 status: row.get::<String, _>("status").parse().unwrap(),
221 priority: row.get("priority"),
222 attempts: row.get("attempts"),
223 max_attempts: row.get("max_attempts"),
224 last_error: row.get("last_error"),
225 worker_capability: row.get("worker_capability"),
226 worker_id: row.get("worker_id"),
227 idempotency_key: row.get("idempotency_key"),
228 scheduled_at: row.get("scheduled_at"),
229 created_at: row.get("created_at"),
230 claimed_at: row.get("claimed_at"),
231 started_at: row.get("started_at"),
232 completed_at: row.get("completed_at"),
233 failed_at: row.get("failed_at"),
234 last_heartbeat: row.get("last_heartbeat"),
235 cancel_requested_at: row.get("cancel_requested_at"),
236 cancelled_at: row.get("cancelled_at"),
237 cancel_reason: row.get("cancel_reason"),
238 }
239 })
240 .collect();
241
242 Ok(jobs)
243 }
244
245 pub async fn start(&self, job_id: Uuid) -> Result<(), sqlx::Error> {
247 let result = sqlx::query(
248 r#"
249 UPDATE forge_jobs
250 SET status = 'running', started_at = NOW()
251 WHERE id = $1
252 AND status NOT IN ('cancel_requested', 'cancelled')
253 "#,
254 )
255 .bind(job_id)
256 .execute(&self.pool)
257 .await?;
258
259 if result.rows_affected() == 0 {
260 return Err(sqlx::Error::RowNotFound);
261 }
262
263 Ok(())
264 }
265
266 pub async fn complete(
270 &self,
271 job_id: Uuid,
272 output: serde_json::Value,
273 ttl: Option<std::time::Duration>,
274 ) -> Result<(), sqlx::Error> {
275 let expires_at = ttl.map(|d| {
276 chrono::Utc::now() + chrono::Duration::from_std(d).unwrap_or(chrono::Duration::days(7))
277 });
278
279 sqlx::query(
280 r#"
281 UPDATE forge_jobs
282 SET
283 status = 'completed',
284 output = $2,
285 completed_at = NOW(),
286 cancel_requested_at = NULL,
287 cancelled_at = NULL,
288 cancel_reason = NULL,
289 expires_at = $3
290 WHERE id = $1
291 "#,
292 )
293 .bind(job_id)
294 .bind(output)
295 .bind(expires_at)
296 .execute(&self.pool)
297 .await?;
298
299 Ok(())
300 }
301
302 pub async fn fail(
306 &self,
307 job_id: Uuid,
308 error: &str,
309 retry_delay: Option<chrono::Duration>,
310 ttl: Option<std::time::Duration>,
311 ) -> Result<(), sqlx::Error> {
312 if let Some(delay) = retry_delay {
313 sqlx::query(
315 r#"
316 UPDATE forge_jobs
317 SET
318 status = 'pending',
319 worker_id = NULL,
320 claimed_at = NULL,
321 started_at = NULL,
322 last_error = $2,
323 scheduled_at = NOW() + $3,
324 cancel_requested_at = NULL,
325 cancelled_at = NULL,
326 cancel_reason = NULL
327 WHERE id = $1
328 "#,
329 )
330 .bind(job_id)
331 .bind(error)
332 .bind(delay)
333 .execute(&self.pool)
334 .await?;
335 } else {
336 let expires_at = ttl.map(|d| {
338 chrono::Utc::now()
339 + chrono::Duration::from_std(d).unwrap_or(chrono::Duration::days(7))
340 });
341
342 sqlx::query(
343 r#"
344 UPDATE forge_jobs
345 SET
346 status = 'dead_letter',
347 last_error = $2,
348 failed_at = NOW(),
349 cancel_requested_at = NULL,
350 cancelled_at = NULL,
351 cancel_reason = NULL,
352 expires_at = $3
353 WHERE id = $1
354 "#,
355 )
356 .bind(job_id)
357 .bind(error)
358 .bind(expires_at)
359 .execute(&self.pool)
360 .await?;
361 }
362
363 Ok(())
364 }
365
366 pub async fn heartbeat(&self, job_id: Uuid) -> Result<(), sqlx::Error> {
368 sqlx::query(
369 r#"
370 UPDATE forge_jobs
371 SET last_heartbeat = NOW()
372 WHERE id = $1
373 "#,
374 )
375 .bind(job_id)
376 .execute(&self.pool)
377 .await?;
378
379 Ok(())
380 }
381
382 pub async fn update_progress(
384 &self,
385 job_id: Uuid,
386 percent: i32,
387 message: &str,
388 ) -> Result<(), sqlx::Error> {
389 sqlx::query(
390 r#"
391 UPDATE forge_jobs
392 SET progress_percent = $2, progress_message = $3, last_heartbeat = NOW()
393 WHERE id = $1
394 "#,
395 )
396 .bind(job_id)
397 .bind(percent)
398 .bind(message)
399 .execute(&self.pool)
400 .await?;
401
402 Ok(())
403 }
404
405 pub async fn set_context(
407 &self,
408 job_id: Uuid,
409 context: serde_json::Value,
410 ) -> Result<(), sqlx::Error> {
411 sqlx::query(
412 r#"
413 UPDATE forge_jobs
414 SET job_context = $2
415 WHERE id = $1
416 "#,
417 )
418 .bind(job_id)
419 .bind(context)
420 .execute(&self.pool)
421 .await?;
422
423 Ok(())
424 }
425
426 pub async fn request_cancel(
428 &self,
429 job_id: Uuid,
430 reason: Option<&str>,
431 ) -> Result<bool, sqlx::Error> {
432 let row: Option<(String,)> = sqlx::query_as(
433 r#"
434 SELECT status
435 FROM forge_jobs
436 WHERE id = $1
437 "#,
438 )
439 .bind(job_id)
440 .fetch_optional(&self.pool)
441 .await?;
442
443 let status = match row {
444 Some((status,)) => status,
445 None => return Ok(false),
446 };
447
448 let terminal_statuses = [
449 JobStatus::Completed.as_str(),
450 JobStatus::Failed.as_str(),
451 JobStatus::DeadLetter.as_str(),
452 JobStatus::Cancelled.as_str(),
453 ];
454
455 if status == JobStatus::Running.as_str() {
456 let updated = sqlx::query(
457 r#"
458 UPDATE forge_jobs
459 SET
460 status = 'cancel_requested',
461 cancel_requested_at = NOW(),
462 cancel_reason = COALESCE($2, cancel_reason)
463 WHERE id = $1
464 AND status = 'running'
465 "#,
466 )
467 .bind(job_id)
468 .bind(reason)
469 .execute(&self.pool)
470 .await?;
471
472 return Ok(updated.rows_affected() > 0);
473 }
474
475 if terminal_statuses.contains(&status.as_str()) {
476 return Ok(false);
477 }
478
479 let updated = sqlx::query(
480 r#"
481 UPDATE forge_jobs
482 SET
483 status = 'cancelled',
484 cancelled_at = NOW(),
485 cancel_reason = COALESCE($2, cancel_reason)
486 WHERE id = $1
487 AND status NOT IN ('completed', 'failed', 'dead_letter', 'cancelled')
488 "#,
489 )
490 .bind(job_id)
491 .bind(reason)
492 .execute(&self.pool)
493 .await?;
494
495 Ok(updated.rows_affected() > 0)
496 }
497
498 pub async fn cancel(
502 &self,
503 job_id: Uuid,
504 reason: Option<&str>,
505 ttl: Option<std::time::Duration>,
506 ) -> Result<(), sqlx::Error> {
507 let expires_at = ttl.map(|d| {
508 chrono::Utc::now() + chrono::Duration::from_std(d).unwrap_or(chrono::Duration::days(7))
509 });
510
511 sqlx::query(
512 r#"
513 UPDATE forge_jobs
514 SET
515 status = 'cancelled',
516 cancelled_at = NOW(),
517 cancel_reason = COALESCE($2, cancel_reason),
518 expires_at = $3
519 WHERE id = $1
520 "#,
521 )
522 .bind(job_id)
523 .bind(reason)
524 .bind(expires_at)
525 .execute(&self.pool)
526 .await?;
527
528 Ok(())
529 }
530
531 pub async fn release_stale(
533 &self,
534 stale_threshold: chrono::Duration,
535 ) -> Result<u64, sqlx::Error> {
536 let result = sqlx::query(
537 r#"
538 UPDATE forge_jobs
539 SET
540 status = 'pending',
541 worker_id = NULL,
542 claimed_at = NULL
543 WHERE status IN ('claimed', 'running')
544 AND claimed_at < NOW() - $1
545 "#,
546 )
547 .bind(stale_threshold)
548 .execute(&self.pool)
549 .await?;
550
551 Ok(result.rows_affected())
552 }
553
554 pub async fn cleanup_expired(&self) -> Result<u64, sqlx::Error> {
559 let result = sqlx::query(
560 r#"
561 DELETE FROM forge_jobs
562 WHERE expires_at IS NOT NULL
563 AND expires_at < NOW()
564 AND status IN ('completed', 'cancelled', 'failed', 'dead_letter')
565 "#,
566 )
567 .execute(&self.pool)
568 .await?;
569
570 Ok(result.rows_affected())
571 }
572
573 pub async fn stats(&self) -> Result<QueueStats, sqlx::Error> {
575 let row = sqlx::query(
576 r#"
577 SELECT
578 COUNT(*) FILTER (WHERE status = 'pending') as pending,
579 COUNT(*) FILTER (WHERE status = 'claimed') as claimed,
580 COUNT(*) FILTER (WHERE status = 'running') as running,
581 COUNT(*) FILTER (WHERE status = 'completed') as completed,
582 COUNT(*) FILTER (WHERE status = 'cancelled') as cancelled,
583 COUNT(*) FILTER (WHERE status = 'failed') as failed,
584 COUNT(*) FILTER (WHERE status = 'dead_letter') as dead_letter
585 FROM forge_jobs
586 "#,
587 )
588 .fetch_one(&self.pool)
589 .await?;
590
591 use sqlx::Row;
592 Ok(QueueStats {
593 pending: row.get::<i64, _>("pending") as u64,
594 claimed: row.get::<i64, _>("claimed") as u64,
595 running: row.get::<i64, _>("running") as u64,
596 completed: row.get::<i64, _>("completed") as u64,
597 cancelled: row.get::<i64, _>("cancelled") as u64,
598 failed: row.get::<i64, _>("failed") as u64,
599 dead_letter: row.get::<i64, _>("dead_letter") as u64,
600 })
601 }
602}
603
604#[derive(Debug, Clone, Default)]
606pub struct QueueStats {
607 pub pending: u64,
608 pub claimed: u64,
609 pub running: u64,
610 pub completed: u64,
611 pub cancelled: u64,
612 pub failed: u64,
613 pub dead_letter: u64,
614}
615
616#[cfg(test)]
617mod tests {
618 use super::*;
619
620 #[test]
621 fn test_job_record_creation() {
622 let job = JobRecord::new("send_email", serde_json::json!({}), JobPriority::Normal, 3);
623
624 assert_eq!(job.job_type, "send_email");
625 assert_eq!(job.status, JobStatus::Pending);
626 assert_eq!(job.priority, 50);
627 assert_eq!(job.attempts, 0);
628 assert_eq!(job.max_attempts, 3);
629 }
630
631 #[test]
632 fn test_job_record_with_capability() {
633 let job = JobRecord::new("transcode", serde_json::json!({}), JobPriority::High, 3)
634 .with_capability("media");
635
636 assert_eq!(job.worker_capability, Some("media".to_string()));
637 assert_eq!(job.priority, 75);
638 }
639
640 #[test]
641 fn test_job_record_with_idempotency() {
642 let job = JobRecord::new("payment", serde_json::json!({}), JobPriority::Critical, 5)
643 .with_idempotency_key("payment-123");
644
645 assert_eq!(job.idempotency_key, Some("payment-123".to_string()));
646 }
647}