1use chrono::{DateTime, Utc};
2use forge_core::job::{JobPriority, JobStatus};
3use uuid::Uuid;
4
5#[derive(Debug, Clone)]
7pub struct JobRecord {
8 pub id: Uuid,
10 pub job_type: String,
12 pub input: serde_json::Value,
14 pub output: Option<serde_json::Value>,
16 pub job_context: serde_json::Value,
18 pub status: JobStatus,
20 pub priority: i32,
22 pub attempts: i32,
24 pub max_attempts: i32,
26 pub last_error: Option<String>,
28 pub worker_capability: Option<String>,
30 pub worker_id: Option<Uuid>,
32 pub idempotency_key: Option<String>,
34 pub owner_subject: Option<String>,
36 pub scheduled_at: DateTime<Utc>,
38 pub created_at: DateTime<Utc>,
40 pub claimed_at: Option<DateTime<Utc>>,
42 pub started_at: Option<DateTime<Utc>>,
44 pub completed_at: Option<DateTime<Utc>>,
46 pub failed_at: Option<DateTime<Utc>>,
48 pub last_heartbeat: Option<DateTime<Utc>>,
50 pub cancel_requested_at: Option<DateTime<Utc>>,
52 pub cancelled_at: Option<DateTime<Utc>>,
54 pub cancel_reason: Option<String>,
56}
57
58impl JobRecord {
59 pub fn new(
61 job_type: impl Into<String>,
62 input: serde_json::Value,
63 priority: JobPriority,
64 max_attempts: i32,
65 ) -> Self {
66 Self {
67 id: Uuid::new_v4(),
68 job_type: job_type.into(),
69 input,
70 output: None,
71 job_context: serde_json::json!({}),
72 status: JobStatus::Pending,
73 priority: priority.as_i32(),
74 attempts: 0,
75 max_attempts,
76 last_error: None,
77 worker_capability: None,
78 worker_id: None,
79 idempotency_key: None,
80 owner_subject: None,
81 scheduled_at: Utc::now(),
82 created_at: Utc::now(),
83 claimed_at: None,
84 started_at: None,
85 completed_at: None,
86 failed_at: None,
87 last_heartbeat: None,
88 cancel_requested_at: None,
89 cancelled_at: None,
90 cancel_reason: None,
91 }
92 }
93
94 pub fn with_capability(mut self, capability: impl Into<String>) -> Self {
96 self.worker_capability = Some(capability.into());
97 self
98 }
99
100 pub fn with_scheduled_at(mut self, at: DateTime<Utc>) -> Self {
102 self.scheduled_at = at;
103 self
104 }
105
106 pub fn with_idempotency_key(mut self, key: impl Into<String>) -> Self {
108 self.idempotency_key = Some(key.into());
109 self
110 }
111
112 pub fn with_owner_subject(mut self, owner_subject: Option<String>) -> Self {
114 self.owner_subject = owner_subject;
115 self
116 }
117}
118
119#[derive(Clone)]
121pub struct JobQueue {
122 pool: sqlx::PgPool,
123}
124
125impl JobQueue {
126 pub fn new(pool: sqlx::PgPool) -> Self {
128 Self { pool }
129 }
130
131 pub async fn enqueue(&self, job: JobRecord) -> Result<Uuid, sqlx::Error> {
133 if let Some(ref key) = job.idempotency_key {
135 let existing = sqlx::query_scalar!(
136 r#"
137 SELECT id FROM forge_jobs
138 WHERE idempotency_key = $1
139 AND status NOT IN ('completed', 'failed', 'dead_letter', 'cancelled')
140 "#,
141 key
142 )
143 .fetch_optional(&self.pool)
144 .await?;
145
146 if let Some(id) = existing {
147 return Ok(id); }
149 }
150
151 sqlx::query!(
152 r#"
153 INSERT INTO forge_jobs (
154 id, job_type, input, job_context, status, priority, attempts, max_attempts,
155 worker_capability, idempotency_key, owner_subject, scheduled_at, created_at
156 ) VALUES (
157 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13
158 )
159 "#,
160 job.id,
161 &job.job_type,
162 job.input as _,
163 job.job_context as _,
164 job.status.as_str(),
165 job.priority,
166 job.attempts,
167 job.max_attempts,
168 job.worker_capability as _,
169 job.idempotency_key as _,
170 job.owner_subject as _,
171 job.scheduled_at,
172 job.created_at,
173 )
174 .execute(&self.pool)
175 .await?;
176
177 Ok(job.id)
178 }
179
180 pub async fn claim(
182 &self,
183 worker_id: Uuid,
184 capabilities: &[String],
185 limit: i32,
186 ) -> Result<Vec<JobRecord>, sqlx::Error> {
187 let rows = sqlx::query!(
188 r#"
189 WITH claimable AS (
190 SELECT id
191 FROM forge_jobs
192 WHERE status = 'pending'
193 AND scheduled_at <= NOW()
194 AND (worker_capability = ANY($2) OR worker_capability IS NULL)
195 ORDER BY priority DESC, scheduled_at ASC
196 LIMIT $3
197 FOR UPDATE SKIP LOCKED
198 )
199 UPDATE forge_jobs
200 SET
201 status = 'claimed',
202 worker_id = $1,
203 claimed_at = NOW(),
204 attempts = attempts + 1
205 WHERE id IN (SELECT id FROM claimable)
206 RETURNING
207 id, job_type, input, output, job_context, status, priority,
208 attempts, max_attempts, last_error, worker_capability,
209 worker_id, idempotency_key, owner_subject, scheduled_at, created_at,
210 claimed_at, started_at, completed_at, failed_at, last_heartbeat,
211 cancel_requested_at, cancelled_at, cancel_reason
212 "#,
213 worker_id,
214 capabilities,
215 limit as i64,
216 )
217 .fetch_all(&self.pool)
218 .await?;
219
220 let jobs = rows
221 .into_iter()
222 .map(|row| JobRecord {
223 id: row.id,
224 job_type: row.job_type,
225 input: row.input,
226 output: row.output,
227 job_context: row.job_context,
228 status: row
229 .status
230 .parse()
231 .unwrap_or(forge_core::job::JobStatus::Failed),
232 priority: row.priority,
233 attempts: row.attempts,
234 max_attempts: row.max_attempts,
235 last_error: row.last_error,
236 worker_capability: row.worker_capability,
237 worker_id: row.worker_id,
238 idempotency_key: row.idempotency_key,
239 owner_subject: row.owner_subject,
240 scheduled_at: row.scheduled_at,
241 created_at: row.created_at,
242 claimed_at: row.claimed_at,
243 started_at: row.started_at,
244 completed_at: row.completed_at,
245 failed_at: row.failed_at,
246 last_heartbeat: row.last_heartbeat,
247 cancel_requested_at: row.cancel_requested_at,
248 cancelled_at: row.cancelled_at,
249 cancel_reason: row.cancel_reason,
250 })
251 .collect();
252
253 Ok(jobs)
254 }
255
256 pub async fn start(&self, job_id: Uuid) -> Result<(), sqlx::Error> {
258 let result = sqlx::query!(
259 r#"
260 UPDATE forge_jobs
261 SET status = 'running', started_at = NOW(), last_heartbeat = NOW()
262 WHERE id = $1
263 AND status NOT IN ('cancel_requested', 'cancelled')
264 "#,
265 job_id,
266 )
267 .execute(&self.pool)
268 .await?;
269
270 if result.rows_affected() == 0 {
271 return Err(sqlx::Error::RowNotFound);
272 }
273
274 Ok(())
275 }
276
277 pub async fn complete(
281 &self,
282 job_id: Uuid,
283 output: serde_json::Value,
284 ttl: Option<std::time::Duration>,
285 ) -> Result<(), sqlx::Error> {
286 let expires_at = ttl.map(|d| {
287 chrono::Utc::now() + chrono::Duration::from_std(d).unwrap_or(chrono::Duration::days(7))
288 });
289
290 sqlx::query!(
291 r#"
292 UPDATE forge_jobs
293 SET
294 status = 'completed',
295 output = $2,
296 completed_at = NOW(),
297 cancel_requested_at = NULL,
298 cancelled_at = NULL,
299 cancel_reason = NULL,
300 expires_at = $3
301 WHERE id = $1
302 "#,
303 job_id,
304 output as _,
305 expires_at,
306 )
307 .execute(&self.pool)
308 .await?;
309
310 Ok(())
311 }
312
313 pub async fn fail(
317 &self,
318 job_id: Uuid,
319 error: &str,
320 retry_delay: Option<chrono::Duration>,
321 ttl: Option<std::time::Duration>,
322 ) -> Result<(), sqlx::Error> {
323 if let Some(delay) = retry_delay {
324 sqlx::query!(
326 r#"
327 UPDATE forge_jobs
328 SET
329 status = 'pending',
330 worker_id = NULL,
331 claimed_at = NULL,
332 started_at = NULL,
333 last_error = $2,
334 scheduled_at = NOW() + make_interval(secs => $3),
335 cancel_requested_at = NULL,
336 cancelled_at = NULL,
337 cancel_reason = NULL
338 WHERE id = $1
339 "#,
340 job_id,
341 error,
342 delay.num_seconds() as f64,
343 )
344 .execute(&self.pool)
345 .await?;
346 } else {
347 let expires_at = ttl.map(|d| {
349 chrono::Utc::now()
350 + chrono::Duration::from_std(d).unwrap_or(chrono::Duration::days(7))
351 });
352
353 sqlx::query!(
354 r#"
355 UPDATE forge_jobs
356 SET
357 status = 'dead_letter',
358 last_error = $2,
359 failed_at = NOW(),
360 cancel_requested_at = NULL,
361 cancelled_at = NULL,
362 cancel_reason = NULL,
363 expires_at = $3
364 WHERE id = $1
365 "#,
366 job_id,
367 error,
368 expires_at,
369 )
370 .execute(&self.pool)
371 .await?;
372 }
373
374 Ok(())
375 }
376
377 pub async fn heartbeat(&self, job_id: Uuid) -> Result<(), sqlx::Error> {
379 sqlx::query!(
380 r#"
381 UPDATE forge_jobs
382 SET last_heartbeat = NOW()
383 WHERE id = $1
384 "#,
385 job_id,
386 )
387 .execute(&self.pool)
388 .await?;
389
390 Ok(())
391 }
392
393 pub async fn update_progress(
395 &self,
396 job_id: Uuid,
397 percent: i32,
398 message: &str,
399 ) -> Result<(), sqlx::Error> {
400 sqlx::query!(
401 r#"
402 UPDATE forge_jobs
403 SET progress_percent = $2, progress_message = $3, last_heartbeat = NOW()
404 WHERE id = $1
405 "#,
406 job_id,
407 percent,
408 message,
409 )
410 .execute(&self.pool)
411 .await?;
412
413 Ok(())
414 }
415
416 pub async fn set_context(
418 &self,
419 job_id: Uuid,
420 context: serde_json::Value,
421 ) -> Result<(), sqlx::Error> {
422 sqlx::query!(
423 r#"
424 UPDATE forge_jobs
425 SET job_context = $2
426 WHERE id = $1
427 "#,
428 job_id,
429 context as _,
430 )
431 .execute(&self.pool)
432 .await?;
433
434 Ok(())
435 }
436
437 pub async fn request_cancel(
443 &self,
444 job_id: Uuid,
445 reason: Option<&str>,
446 caller_subject: Option<&str>,
447 ) -> Result<bool, sqlx::Error> {
448 let row = sqlx::query!(
449 "SELECT status, owner_subject FROM forge_jobs WHERE id = $1",
450 job_id
451 )
452 .fetch_optional(&self.pool)
453 .await?;
454
455 let (status, owner_subject) = match row {
456 Some(r) => (r.status, r.owner_subject),
457 None => return Ok(false),
458 };
459
460 if let Some(ref owner) = owner_subject {
463 match caller_subject {
464 Some(caller) if caller == owner => { }
465 _ => return Ok(false), }
467 }
468
469 let terminal_statuses = [
470 JobStatus::Completed.as_str(),
471 JobStatus::Failed.as_str(),
472 JobStatus::DeadLetter.as_str(),
473 JobStatus::Cancelled.as_str(),
474 ];
475
476 if status == JobStatus::Running.as_str() {
477 let updated = sqlx::query!(
478 r#"
479 UPDATE forge_jobs
480 SET
481 status = 'cancel_requested',
482 cancel_requested_at = NOW(),
483 cancel_reason = COALESCE($2, cancel_reason)
484 WHERE id = $1
485 AND status = 'running'
486 "#,
487 job_id,
488 reason,
489 )
490 .execute(&self.pool)
491 .await?;
492
493 return Ok(updated.rows_affected() > 0);
494 }
495
496 if terminal_statuses.contains(&status.as_str()) {
497 return Ok(false);
498 }
499
500 let updated = sqlx::query!(
501 r#"
502 UPDATE forge_jobs
503 SET
504 status = 'cancelled',
505 cancelled_at = NOW(),
506 cancel_reason = COALESCE($2, cancel_reason)
507 WHERE id = $1
508 AND status NOT IN ('completed', 'failed', 'dead_letter', 'cancelled')
509 "#,
510 job_id,
511 reason,
512 )
513 .execute(&self.pool)
514 .await?;
515
516 Ok(updated.rows_affected() > 0)
517 }
518
519 pub async fn cancel(
523 &self,
524 job_id: Uuid,
525 reason: Option<&str>,
526 ttl: Option<std::time::Duration>,
527 ) -> Result<(), sqlx::Error> {
528 let expires_at = ttl.map(|d| {
529 chrono::Utc::now() + chrono::Duration::from_std(d).unwrap_or(chrono::Duration::days(7))
530 });
531
532 sqlx::query!(
533 r#"
534 UPDATE forge_jobs
535 SET
536 status = 'cancelled',
537 cancelled_at = NOW(),
538 cancel_reason = COALESCE($2, cancel_reason),
539 expires_at = $3
540 WHERE id = $1
541 "#,
542 job_id,
543 reason,
544 expires_at,
545 )
546 .execute(&self.pool)
547 .await?;
548
549 Ok(())
550 }
551
552 pub async fn release_stale(
554 &self,
555 stale_threshold: chrono::Duration,
556 ) -> Result<u64, sqlx::Error> {
557 let result = sqlx::query!(
558 r#"
559 UPDATE forge_jobs
560 SET
561 status = 'pending',
562 worker_id = NULL,
563 claimed_at = NULL,
564 started_at = NULL,
565 last_heartbeat = NULL
566 WHERE
567 (
568 status = 'claimed'
569 AND claimed_at < NOW() - make_interval(secs => $1)
570 )
571 OR (
572 status = 'running'
573 AND COALESCE(last_heartbeat, started_at, claimed_at) < NOW() - make_interval(secs => $1)
574 )
575 "#,
576 stale_threshold.num_seconds() as f64,
577 )
578 .execute(&self.pool)
579 .await?;
580
581 Ok(result.rows_affected())
582 }
583
584 pub async fn cleanup_expired(&self) -> Result<u64, sqlx::Error> {
589 let result = sqlx::query!(
590 r#"
591 DELETE FROM forge_jobs
592 WHERE expires_at IS NOT NULL
593 AND expires_at < NOW()
594 AND status IN ('completed', 'cancelled', 'failed', 'dead_letter')
595 "#,
596 )
597 .execute(&self.pool)
598 .await?;
599
600 Ok(result.rows_affected())
601 }
602
603 pub async fn stats(&self) -> Result<QueueStats, sqlx::Error> {
605 let row = sqlx::query!(
606 r#"
607 SELECT
608 COUNT(*) FILTER (WHERE status = 'pending') as "pending!",
609 COUNT(*) FILTER (WHERE status = 'claimed') as "claimed!",
610 COUNT(*) FILTER (WHERE status = 'running') as "running!",
611 COUNT(*) FILTER (WHERE status = 'completed') as "completed!",
612 COUNT(*) FILTER (WHERE status = 'cancelled') as "cancelled!",
613 COUNT(*) FILTER (WHERE status = 'failed') as "failed!",
614 COUNT(*) FILTER (WHERE status = 'dead_letter') as "dead_letter!"
615 FROM forge_jobs
616 "#,
617 )
618 .fetch_one(&self.pool)
619 .await?;
620
621 Ok(QueueStats {
622 pending: row.pending as u64,
623 claimed: row.claimed as u64,
624 running: row.running as u64,
625 completed: row.completed as u64,
626 cancelled: row.cancelled as u64,
627 failed: row.failed as u64,
628 dead_letter: row.dead_letter as u64,
629 })
630 }
631}
632
633#[derive(Debug, Clone, Default)]
635pub struct QueueStats {
636 pub pending: u64,
637 pub claimed: u64,
638 pub running: u64,
639 pub completed: u64,
640 pub cancelled: u64,
641 pub failed: u64,
642 pub dead_letter: u64,
643}
644
645#[cfg(test)]
646mod tests {
647 use super::*;
648
649 #[test]
650 fn test_job_record_creation() {
651 let job = JobRecord::new("send_email", serde_json::json!({}), JobPriority::Normal, 3);
652
653 assert_eq!(job.job_type, "send_email");
654 assert_eq!(job.status, JobStatus::Pending);
655 assert_eq!(job.priority, 50);
656 assert_eq!(job.attempts, 0);
657 assert_eq!(job.max_attempts, 3);
658 }
659
660 #[test]
661 fn test_job_record_with_capability() {
662 let job = JobRecord::new("transcode", serde_json::json!({}), JobPriority::High, 3)
663 .with_capability("media");
664
665 assert_eq!(job.worker_capability, Some("media".to_string()));
666 assert_eq!(job.priority, 75);
667 }
668
669 #[test]
670 fn test_job_record_with_idempotency() {
671 let job = JobRecord::new("payment", serde_json::json!({}), JobPriority::Critical, 5)
672 .with_idempotency_key("payment-123");
673
674 assert_eq!(job.idempotency_key, Some("payment-123".to_string()));
675 }
676}