1use chrono::{DateTime, Utc};
2use forge_core::job::{JobPriority, JobStatus};
3use uuid::Uuid;
4
5#[derive(Debug, Clone)]
7pub struct JobRecord {
8 pub id: Uuid,
10 pub job_type: String,
12 pub input: serde_json::Value,
14 pub output: Option<serde_json::Value>,
16 pub job_context: serde_json::Value,
18 pub status: JobStatus,
20 pub priority: i32,
22 pub attempts: i32,
24 pub max_attempts: i32,
26 pub last_error: Option<String>,
28 pub worker_capability: Option<String>,
30 pub worker_id: Option<Uuid>,
32 pub idempotency_key: Option<String>,
34 pub owner_subject: Option<String>,
36 pub scheduled_at: DateTime<Utc>,
38 pub created_at: DateTime<Utc>,
40 pub claimed_at: Option<DateTime<Utc>>,
42 pub started_at: Option<DateTime<Utc>>,
44 pub completed_at: Option<DateTime<Utc>>,
46 pub failed_at: Option<DateTime<Utc>>,
48 pub last_heartbeat: Option<DateTime<Utc>>,
50 pub cancel_requested_at: Option<DateTime<Utc>>,
52 pub cancelled_at: Option<DateTime<Utc>>,
54 pub cancel_reason: Option<String>,
56}
57
58impl JobRecord {
59 pub fn new(
61 job_type: impl Into<String>,
62 input: serde_json::Value,
63 priority: JobPriority,
64 max_attempts: i32,
65 ) -> Self {
66 Self {
67 id: Uuid::new_v4(),
68 job_type: job_type.into(),
69 input,
70 output: None,
71 job_context: serde_json::json!({}),
72 status: JobStatus::Pending,
73 priority: priority.as_i32(),
74 attempts: 0,
75 max_attempts,
76 last_error: None,
77 worker_capability: None,
78 worker_id: None,
79 idempotency_key: None,
80 owner_subject: None,
81 scheduled_at: Utc::now(),
82 created_at: Utc::now(),
83 claimed_at: None,
84 started_at: None,
85 completed_at: None,
86 failed_at: None,
87 last_heartbeat: None,
88 cancel_requested_at: None,
89 cancelled_at: None,
90 cancel_reason: None,
91 }
92 }
93
94 pub fn with_capability(mut self, capability: impl Into<String>) -> Self {
96 self.worker_capability = Some(capability.into());
97 self
98 }
99
100 pub fn with_scheduled_at(mut self, at: DateTime<Utc>) -> Self {
102 self.scheduled_at = at;
103 self
104 }
105
106 pub fn with_idempotency_key(mut self, key: impl Into<String>) -> Self {
108 self.idempotency_key = Some(key.into());
109 self
110 }
111
112 pub fn with_owner_subject(mut self, owner_subject: Option<String>) -> Self {
114 self.owner_subject = owner_subject;
115 self
116 }
117}
118
119#[derive(Clone)]
121pub struct JobQueue {
122 pool: sqlx::PgPool,
123}
124
125impl JobQueue {
126 pub fn new(pool: sqlx::PgPool) -> Self {
128 Self { pool }
129 }
130
131 pub async fn enqueue(&self, job: JobRecord) -> Result<Uuid, sqlx::Error> {
133 if let Some(ref key) = job.idempotency_key {
135 let existing: Option<(Uuid,)> = sqlx::query_as(
136 r#"
137 SELECT id FROM forge_jobs
138 WHERE idempotency_key = $1
139 AND status NOT IN ('completed', 'failed', 'dead_letter', 'cancelled')
140 "#,
141 )
142 .bind(key)
143 .fetch_optional(&self.pool)
144 .await?;
145
146 if let Some((id,)) = existing {
147 return Ok(id); }
149 }
150
151 sqlx::query(
152 r#"
153 INSERT INTO forge_jobs (
154 id, job_type, input, job_context, status, priority, attempts, max_attempts,
155 worker_capability, idempotency_key, owner_subject, scheduled_at, created_at
156 ) VALUES (
157 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13
158 )
159 "#,
160 )
161 .bind(job.id)
162 .bind(&job.job_type)
163 .bind(&job.input)
164 .bind(&job.job_context)
165 .bind(job.status.as_str())
166 .bind(job.priority)
167 .bind(job.attempts)
168 .bind(job.max_attempts)
169 .bind(&job.worker_capability)
170 .bind(&job.idempotency_key)
171 .bind(&job.owner_subject)
172 .bind(job.scheduled_at)
173 .bind(job.created_at)
174 .execute(&self.pool)
175 .await?;
176
177 Ok(job.id)
178 }
179
180 pub async fn claim(
182 &self,
183 worker_id: Uuid,
184 capabilities: &[String],
185 limit: i32,
186 ) -> Result<Vec<JobRecord>, sqlx::Error> {
187 let rows = sqlx::query(
188 r#"
189 WITH claimable AS (
190 SELECT id
191 FROM forge_jobs
192 WHERE status = 'pending'
193 AND scheduled_at <= NOW()
194 AND (worker_capability = ANY($2) OR worker_capability IS NULL)
195 ORDER BY priority DESC, scheduled_at ASC
196 LIMIT $3
197 FOR UPDATE SKIP LOCKED
198 )
199 UPDATE forge_jobs
200 SET
201 status = 'claimed',
202 worker_id = $1,
203 claimed_at = NOW(),
204 attempts = attempts + 1
205 WHERE id IN (SELECT id FROM claimable)
206 RETURNING
207 id, job_type, input, output, job_context, status, priority,
208 attempts, max_attempts, last_error, worker_capability,
209 worker_id, idempotency_key, owner_subject, scheduled_at, created_at,
210 claimed_at, started_at, completed_at, failed_at, last_heartbeat,
211 cancel_requested_at, cancelled_at, cancel_reason
212 "#,
213 )
214 .bind(worker_id)
215 .bind(capabilities)
216 .bind(limit)
217 .fetch_all(&self.pool)
218 .await?;
219
220 let jobs = rows
221 .iter()
222 .map(|row| {
223 use sqlx::Row;
224 JobRecord {
225 id: row.get("id"),
226 job_type: row.get("job_type"),
227 input: row.get("input"),
228 output: row.get("output"),
229 job_context: row.get("job_context"),
230 status: row
231 .get::<String, _>("status")
232 .parse()
233 .expect("valid job status from database"),
234 priority: row.get("priority"),
235 attempts: row.get("attempts"),
236 max_attempts: row.get("max_attempts"),
237 last_error: row.get("last_error"),
238 worker_capability: row.get("worker_capability"),
239 worker_id: row.get("worker_id"),
240 idempotency_key: row.get("idempotency_key"),
241 owner_subject: row.get("owner_subject"),
242 scheduled_at: row.get("scheduled_at"),
243 created_at: row.get("created_at"),
244 claimed_at: row.get("claimed_at"),
245 started_at: row.get("started_at"),
246 completed_at: row.get("completed_at"),
247 failed_at: row.get("failed_at"),
248 last_heartbeat: row.get("last_heartbeat"),
249 cancel_requested_at: row.get("cancel_requested_at"),
250 cancelled_at: row.get("cancelled_at"),
251 cancel_reason: row.get("cancel_reason"),
252 }
253 })
254 .collect();
255
256 Ok(jobs)
257 }
258
259 pub async fn start(&self, job_id: Uuid) -> Result<(), sqlx::Error> {
261 let result = sqlx::query(
262 r#"
263 UPDATE forge_jobs
264 SET status = 'running', started_at = NOW(), last_heartbeat = NOW()
265 WHERE id = $1
266 AND status NOT IN ('cancel_requested', 'cancelled')
267 "#,
268 )
269 .bind(job_id)
270 .execute(&self.pool)
271 .await?;
272
273 if result.rows_affected() == 0 {
274 return Err(sqlx::Error::RowNotFound);
275 }
276
277 Ok(())
278 }
279
280 pub async fn complete(
284 &self,
285 job_id: Uuid,
286 output: serde_json::Value,
287 ttl: Option<std::time::Duration>,
288 ) -> Result<(), sqlx::Error> {
289 let expires_at = ttl.map(|d| {
290 chrono::Utc::now() + chrono::Duration::from_std(d).unwrap_or(chrono::Duration::days(7))
291 });
292
293 sqlx::query(
294 r#"
295 UPDATE forge_jobs
296 SET
297 status = 'completed',
298 output = $2,
299 completed_at = NOW(),
300 cancel_requested_at = NULL,
301 cancelled_at = NULL,
302 cancel_reason = NULL,
303 expires_at = $3
304 WHERE id = $1
305 "#,
306 )
307 .bind(job_id)
308 .bind(output)
309 .bind(expires_at)
310 .execute(&self.pool)
311 .await?;
312
313 Ok(())
314 }
315
316 pub async fn fail(
320 &self,
321 job_id: Uuid,
322 error: &str,
323 retry_delay: Option<chrono::Duration>,
324 ttl: Option<std::time::Duration>,
325 ) -> Result<(), sqlx::Error> {
326 if let Some(delay) = retry_delay {
327 sqlx::query(
329 r#"
330 UPDATE forge_jobs
331 SET
332 status = 'pending',
333 worker_id = NULL,
334 claimed_at = NULL,
335 started_at = NULL,
336 last_error = $2,
337 scheduled_at = NOW() + $3,
338 cancel_requested_at = NULL,
339 cancelled_at = NULL,
340 cancel_reason = NULL
341 WHERE id = $1
342 "#,
343 )
344 .bind(job_id)
345 .bind(error)
346 .bind(delay)
347 .execute(&self.pool)
348 .await?;
349 } else {
350 let expires_at = ttl.map(|d| {
352 chrono::Utc::now()
353 + chrono::Duration::from_std(d).unwrap_or(chrono::Duration::days(7))
354 });
355
356 sqlx::query(
357 r#"
358 UPDATE forge_jobs
359 SET
360 status = 'dead_letter',
361 last_error = $2,
362 failed_at = NOW(),
363 cancel_requested_at = NULL,
364 cancelled_at = NULL,
365 cancel_reason = NULL,
366 expires_at = $3
367 WHERE id = $1
368 "#,
369 )
370 .bind(job_id)
371 .bind(error)
372 .bind(expires_at)
373 .execute(&self.pool)
374 .await?;
375 }
376
377 Ok(())
378 }
379
380 pub async fn heartbeat(&self, job_id: Uuid) -> Result<(), sqlx::Error> {
382 sqlx::query(
383 r#"
384 UPDATE forge_jobs
385 SET last_heartbeat = NOW()
386 WHERE id = $1
387 "#,
388 )
389 .bind(job_id)
390 .execute(&self.pool)
391 .await?;
392
393 Ok(())
394 }
395
396 pub async fn update_progress(
398 &self,
399 job_id: Uuid,
400 percent: i32,
401 message: &str,
402 ) -> Result<(), sqlx::Error> {
403 sqlx::query(
404 r#"
405 UPDATE forge_jobs
406 SET progress_percent = $2, progress_message = $3, last_heartbeat = NOW()
407 WHERE id = $1
408 "#,
409 )
410 .bind(job_id)
411 .bind(percent)
412 .bind(message)
413 .execute(&self.pool)
414 .await?;
415
416 Ok(())
417 }
418
419 pub async fn set_context(
421 &self,
422 job_id: Uuid,
423 context: serde_json::Value,
424 ) -> Result<(), sqlx::Error> {
425 sqlx::query(
426 r#"
427 UPDATE forge_jobs
428 SET job_context = $2
429 WHERE id = $1
430 "#,
431 )
432 .bind(job_id)
433 .bind(context)
434 .execute(&self.pool)
435 .await?;
436
437 Ok(())
438 }
439
440 pub async fn request_cancel(
442 &self,
443 job_id: Uuid,
444 reason: Option<&str>,
445 ) -> Result<bool, sqlx::Error> {
446 let row: Option<(String,)> = sqlx::query_as(
447 r#"
448 SELECT status
449 FROM forge_jobs
450 WHERE id = $1
451 "#,
452 )
453 .bind(job_id)
454 .fetch_optional(&self.pool)
455 .await?;
456
457 let status = match row {
458 Some((status,)) => status,
459 None => return Ok(false),
460 };
461
462 let terminal_statuses = [
463 JobStatus::Completed.as_str(),
464 JobStatus::Failed.as_str(),
465 JobStatus::DeadLetter.as_str(),
466 JobStatus::Cancelled.as_str(),
467 ];
468
469 if status == JobStatus::Running.as_str() {
470 let updated = sqlx::query(
471 r#"
472 UPDATE forge_jobs
473 SET
474 status = 'cancel_requested',
475 cancel_requested_at = NOW(),
476 cancel_reason = COALESCE($2, cancel_reason)
477 WHERE id = $1
478 AND status = 'running'
479 "#,
480 )
481 .bind(job_id)
482 .bind(reason)
483 .execute(&self.pool)
484 .await?;
485
486 return Ok(updated.rows_affected() > 0);
487 }
488
489 if terminal_statuses.contains(&status.as_str()) {
490 return Ok(false);
491 }
492
493 let updated = sqlx::query(
494 r#"
495 UPDATE forge_jobs
496 SET
497 status = 'cancelled',
498 cancelled_at = NOW(),
499 cancel_reason = COALESCE($2, cancel_reason)
500 WHERE id = $1
501 AND status NOT IN ('completed', 'failed', 'dead_letter', 'cancelled')
502 "#,
503 )
504 .bind(job_id)
505 .bind(reason)
506 .execute(&self.pool)
507 .await?;
508
509 Ok(updated.rows_affected() > 0)
510 }
511
512 pub async fn cancel(
516 &self,
517 job_id: Uuid,
518 reason: Option<&str>,
519 ttl: Option<std::time::Duration>,
520 ) -> Result<(), sqlx::Error> {
521 let expires_at = ttl.map(|d| {
522 chrono::Utc::now() + chrono::Duration::from_std(d).unwrap_or(chrono::Duration::days(7))
523 });
524
525 sqlx::query(
526 r#"
527 UPDATE forge_jobs
528 SET
529 status = 'cancelled',
530 cancelled_at = NOW(),
531 cancel_reason = COALESCE($2, cancel_reason),
532 expires_at = $3
533 WHERE id = $1
534 "#,
535 )
536 .bind(job_id)
537 .bind(reason)
538 .bind(expires_at)
539 .execute(&self.pool)
540 .await?;
541
542 Ok(())
543 }
544
545 pub async fn release_stale(
547 &self,
548 stale_threshold: chrono::Duration,
549 ) -> Result<u64, sqlx::Error> {
550 let result = sqlx::query(
551 r#"
552 UPDATE forge_jobs
553 SET
554 status = 'pending',
555 worker_id = NULL,
556 claimed_at = NULL,
557 started_at = NULL,
558 last_heartbeat = NULL
559 WHERE
560 (
561 status = 'claimed'
562 AND claimed_at < NOW() - $1
563 )
564 OR (
565 status = 'running'
566 AND COALESCE(last_heartbeat, started_at, claimed_at) < NOW() - $1
567 )
568 "#,
569 )
570 .bind(stale_threshold)
571 .execute(&self.pool)
572 .await?;
573
574 Ok(result.rows_affected())
575 }
576
577 pub async fn cleanup_expired(&self) -> Result<u64, sqlx::Error> {
582 let result = sqlx::query(
583 r#"
584 DELETE FROM forge_jobs
585 WHERE expires_at IS NOT NULL
586 AND expires_at < NOW()
587 AND status IN ('completed', 'cancelled', 'failed', 'dead_letter')
588 "#,
589 )
590 .execute(&self.pool)
591 .await?;
592
593 Ok(result.rows_affected())
594 }
595
596 pub async fn stats(&self) -> Result<QueueStats, sqlx::Error> {
598 let row = sqlx::query(
599 r#"
600 SELECT
601 COUNT(*) FILTER (WHERE status = 'pending') as pending,
602 COUNT(*) FILTER (WHERE status = 'claimed') as claimed,
603 COUNT(*) FILTER (WHERE status = 'running') as running,
604 COUNT(*) FILTER (WHERE status = 'completed') as completed,
605 COUNT(*) FILTER (WHERE status = 'cancelled') as cancelled,
606 COUNT(*) FILTER (WHERE status = 'failed') as failed,
607 COUNT(*) FILTER (WHERE status = 'dead_letter') as dead_letter
608 FROM forge_jobs
609 "#,
610 )
611 .fetch_one(&self.pool)
612 .await?;
613
614 use sqlx::Row;
615 Ok(QueueStats {
616 pending: row.get::<i64, _>("pending") as u64,
617 claimed: row.get::<i64, _>("claimed") as u64,
618 running: row.get::<i64, _>("running") as u64,
619 completed: row.get::<i64, _>("completed") as u64,
620 cancelled: row.get::<i64, _>("cancelled") as u64,
621 failed: row.get::<i64, _>("failed") as u64,
622 dead_letter: row.get::<i64, _>("dead_letter") as u64,
623 })
624 }
625}
626
627#[derive(Debug, Clone, Default)]
629pub struct QueueStats {
630 pub pending: u64,
631 pub claimed: u64,
632 pub running: u64,
633 pub completed: u64,
634 pub cancelled: u64,
635 pub failed: u64,
636 pub dead_letter: u64,
637}
638
639#[cfg(test)]
640mod tests {
641 use super::*;
642
643 #[test]
644 fn test_job_record_creation() {
645 let job = JobRecord::new("send_email", serde_json::json!({}), JobPriority::Normal, 3);
646
647 assert_eq!(job.job_type, "send_email");
648 assert_eq!(job.status, JobStatus::Pending);
649 assert_eq!(job.priority, 50);
650 assert_eq!(job.attempts, 0);
651 assert_eq!(job.max_attempts, 3);
652 }
653
654 #[test]
655 fn test_job_record_with_capability() {
656 let job = JobRecord::new("transcode", serde_json::json!({}), JobPriority::High, 3)
657 .with_capability("media");
658
659 assert_eq!(job.worker_capability, Some("media".to_string()));
660 assert_eq!(job.priority, 75);
661 }
662
663 #[test]
664 fn test_job_record_with_idempotency() {
665 let job = JobRecord::new("payment", serde_json::json!({}), JobPriority::Critical, 5)
666 .with_idempotency_key("payment-123");
667
668 assert_eq!(job.idempotency_key, Some("payment-123".to_string()));
669 }
670}