1use chrono::{DateTime, Utc};
2use forge_core::job::{JobPriority, JobStatus};
3use uuid::Uuid;
4
5#[derive(Debug, Clone)]
7pub struct JobRecord {
8 pub id: Uuid,
10 pub job_type: String,
12 pub input: serde_json::Value,
14 pub output: Option<serde_json::Value>,
16 pub status: JobStatus,
18 pub priority: i32,
20 pub attempts: i32,
22 pub max_attempts: i32,
24 pub last_error: Option<String>,
26 pub worker_capability: Option<String>,
28 pub worker_id: Option<Uuid>,
30 pub idempotency_key: Option<String>,
32 pub scheduled_at: DateTime<Utc>,
34 pub created_at: DateTime<Utc>,
36 pub claimed_at: Option<DateTime<Utc>>,
38 pub started_at: Option<DateTime<Utc>>,
40 pub completed_at: Option<DateTime<Utc>>,
42 pub failed_at: Option<DateTime<Utc>>,
44 pub last_heartbeat: Option<DateTime<Utc>>,
46}
47
48impl JobRecord {
49 pub fn new(
51 job_type: impl Into<String>,
52 input: serde_json::Value,
53 priority: JobPriority,
54 max_attempts: i32,
55 ) -> Self {
56 Self {
57 id: Uuid::new_v4(),
58 job_type: job_type.into(),
59 input,
60 output: None,
61 status: JobStatus::Pending,
62 priority: priority.as_i32(),
63 attempts: 0,
64 max_attempts,
65 last_error: None,
66 worker_capability: None,
67 worker_id: None,
68 idempotency_key: None,
69 scheduled_at: Utc::now(),
70 created_at: Utc::now(),
71 claimed_at: None,
72 started_at: None,
73 completed_at: None,
74 failed_at: None,
75 last_heartbeat: None,
76 }
77 }
78
79 pub fn with_capability(mut self, capability: impl Into<String>) -> Self {
81 self.worker_capability = Some(capability.into());
82 self
83 }
84
85 pub fn with_scheduled_at(mut self, at: DateTime<Utc>) -> Self {
87 self.scheduled_at = at;
88 self
89 }
90
91 pub fn with_idempotency_key(mut self, key: impl Into<String>) -> Self {
93 self.idempotency_key = Some(key.into());
94 self
95 }
96}
97
98#[derive(Clone)]
100pub struct JobQueue {
101 pool: sqlx::PgPool,
102}
103
104impl JobQueue {
105 pub fn new(pool: sqlx::PgPool) -> Self {
107 Self { pool }
108 }
109
110 pub async fn enqueue(&self, job: JobRecord) -> Result<Uuid, sqlx::Error> {
112 if let Some(ref key) = job.idempotency_key {
114 let existing: Option<(Uuid,)> = sqlx::query_as(
115 r#"
116 SELECT id FROM forge_jobs
117 WHERE idempotency_key = $1
118 AND status NOT IN ('completed', 'failed', 'dead_letter')
119 "#,
120 )
121 .bind(key)
122 .fetch_optional(&self.pool)
123 .await?;
124
125 if let Some((id,)) = existing {
126 return Ok(id); }
128 }
129
130 sqlx::query(
131 r#"
132 INSERT INTO forge_jobs (
133 id, job_type, input, status, priority, attempts, max_attempts,
134 worker_capability, idempotency_key, scheduled_at, created_at
135 ) VALUES (
136 $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11
137 )
138 "#,
139 )
140 .bind(job.id)
141 .bind(&job.job_type)
142 .bind(&job.input)
143 .bind(job.status.as_str())
144 .bind(job.priority)
145 .bind(job.attempts)
146 .bind(job.max_attempts)
147 .bind(&job.worker_capability)
148 .bind(&job.idempotency_key)
149 .bind(job.scheduled_at)
150 .bind(job.created_at)
151 .execute(&self.pool)
152 .await?;
153
154 Ok(job.id)
155 }
156
157 pub async fn claim(
159 &self,
160 worker_id: Uuid,
161 capabilities: &[String],
162 limit: i32,
163 ) -> Result<Vec<JobRecord>, sqlx::Error> {
164 let rows = sqlx::query(
165 r#"
166 WITH claimable AS (
167 SELECT id
168 FROM forge_jobs
169 WHERE status = 'pending'
170 AND scheduled_at <= NOW()
171 AND (worker_capability = ANY($2) OR worker_capability IS NULL)
172 ORDER BY priority DESC, scheduled_at ASC
173 LIMIT $3
174 FOR UPDATE SKIP LOCKED
175 )
176 UPDATE forge_jobs
177 SET
178 status = 'claimed',
179 worker_id = $1,
180 claimed_at = NOW(),
181 attempts = attempts + 1
182 WHERE id IN (SELECT id FROM claimable)
183 RETURNING
184 id, job_type, input, output, status, priority,
185 attempts, max_attempts, last_error, worker_capability,
186 worker_id, idempotency_key, scheduled_at, created_at,
187 claimed_at, started_at, completed_at, failed_at, last_heartbeat
188 "#,
189 )
190 .bind(worker_id)
191 .bind(capabilities)
192 .bind(limit)
193 .fetch_all(&self.pool)
194 .await?;
195
196 let jobs = rows
197 .iter()
198 .map(|row| {
199 use sqlx::Row;
200 JobRecord {
201 id: row.get("id"),
202 job_type: row.get("job_type"),
203 input: row.get("input"),
204 output: row.get("output"),
205 status: row.get::<String, _>("status").parse().unwrap(),
206 priority: row.get("priority"),
207 attempts: row.get("attempts"),
208 max_attempts: row.get("max_attempts"),
209 last_error: row.get("last_error"),
210 worker_capability: row.get("worker_capability"),
211 worker_id: row.get("worker_id"),
212 idempotency_key: row.get("idempotency_key"),
213 scheduled_at: row.get("scheduled_at"),
214 created_at: row.get("created_at"),
215 claimed_at: row.get("claimed_at"),
216 started_at: row.get("started_at"),
217 completed_at: row.get("completed_at"),
218 failed_at: row.get("failed_at"),
219 last_heartbeat: row.get("last_heartbeat"),
220 }
221 })
222 .collect();
223
224 Ok(jobs)
225 }
226
227 pub async fn start(&self, job_id: Uuid) -> Result<(), sqlx::Error> {
229 sqlx::query(
230 r#"
231 UPDATE forge_jobs
232 SET status = 'running', started_at = NOW()
233 WHERE id = $1
234 "#,
235 )
236 .bind(job_id)
237 .execute(&self.pool)
238 .await?;
239
240 Ok(())
241 }
242
243 pub async fn complete(
245 &self,
246 job_id: Uuid,
247 output: serde_json::Value,
248 ) -> Result<(), sqlx::Error> {
249 sqlx::query(
250 r#"
251 UPDATE forge_jobs
252 SET
253 status = 'completed',
254 output = $2,
255 completed_at = NOW()
256 WHERE id = $1
257 "#,
258 )
259 .bind(job_id)
260 .bind(output)
261 .execute(&self.pool)
262 .await?;
263
264 Ok(())
265 }
266
267 pub async fn fail(
269 &self,
270 job_id: Uuid,
271 error: &str,
272 retry_delay: Option<chrono::Duration>,
273 ) -> Result<(), sqlx::Error> {
274 if let Some(delay) = retry_delay {
275 sqlx::query(
277 r#"
278 UPDATE forge_jobs
279 SET
280 status = 'pending',
281 worker_id = NULL,
282 claimed_at = NULL,
283 started_at = NULL,
284 last_error = $2,
285 scheduled_at = NOW() + $3
286 WHERE id = $1
287 "#,
288 )
289 .bind(job_id)
290 .bind(error)
291 .bind(delay)
292 .execute(&self.pool)
293 .await?;
294 } else {
295 sqlx::query(
297 r#"
298 UPDATE forge_jobs
299 SET
300 status = 'dead_letter',
301 last_error = $2,
302 failed_at = NOW()
303 WHERE id = $1
304 "#,
305 )
306 .bind(job_id)
307 .bind(error)
308 .execute(&self.pool)
309 .await?;
310 }
311
312 Ok(())
313 }
314
315 pub async fn heartbeat(&self, job_id: Uuid) -> Result<(), sqlx::Error> {
317 sqlx::query(
318 r#"
319 UPDATE forge_jobs
320 SET last_heartbeat = NOW()
321 WHERE id = $1
322 "#,
323 )
324 .bind(job_id)
325 .execute(&self.pool)
326 .await?;
327
328 Ok(())
329 }
330
331 pub async fn update_progress(
333 &self,
334 job_id: Uuid,
335 percent: i32,
336 message: &str,
337 ) -> Result<(), sqlx::Error> {
338 sqlx::query(
339 r#"
340 UPDATE forge_jobs
341 SET progress_percent = $2, progress_message = $3, last_heartbeat = NOW()
342 WHERE id = $1
343 "#,
344 )
345 .bind(job_id)
346 .bind(percent)
347 .bind(message)
348 .execute(&self.pool)
349 .await?;
350
351 Ok(())
352 }
353
354 pub async fn release_stale(
356 &self,
357 stale_threshold: chrono::Duration,
358 ) -> Result<u64, sqlx::Error> {
359 let result = sqlx::query(
360 r#"
361 UPDATE forge_jobs
362 SET
363 status = 'pending',
364 worker_id = NULL,
365 claimed_at = NULL
366 WHERE status IN ('claimed', 'running')
367 AND claimed_at < NOW() - $1
368 "#,
369 )
370 .bind(stale_threshold)
371 .execute(&self.pool)
372 .await?;
373
374 Ok(result.rows_affected())
375 }
376
377 pub async fn stats(&self) -> Result<QueueStats, sqlx::Error> {
379 let row = sqlx::query(
380 r#"
381 SELECT
382 COUNT(*) FILTER (WHERE status = 'pending') as pending,
383 COUNT(*) FILTER (WHERE status = 'claimed') as claimed,
384 COUNT(*) FILTER (WHERE status = 'running') as running,
385 COUNT(*) FILTER (WHERE status = 'completed') as completed,
386 COUNT(*) FILTER (WHERE status = 'failed') as failed,
387 COUNT(*) FILTER (WHERE status = 'dead_letter') as dead_letter
388 FROM forge_jobs
389 "#,
390 )
391 .fetch_one(&self.pool)
392 .await?;
393
394 use sqlx::Row;
395 Ok(QueueStats {
396 pending: row.get::<i64, _>("pending") as u64,
397 claimed: row.get::<i64, _>("claimed") as u64,
398 running: row.get::<i64, _>("running") as u64,
399 completed: row.get::<i64, _>("completed") as u64,
400 failed: row.get::<i64, _>("failed") as u64,
401 dead_letter: row.get::<i64, _>("dead_letter") as u64,
402 })
403 }
404}
405
406#[derive(Debug, Clone, Default)]
408pub struct QueueStats {
409 pub pending: u64,
410 pub claimed: u64,
411 pub running: u64,
412 pub completed: u64,
413 pub failed: u64,
414 pub dead_letter: u64,
415}
416
417#[cfg(test)]
418mod tests {
419 use super::*;
420
421 #[test]
422 fn test_job_record_creation() {
423 let job = JobRecord::new("send_email", serde_json::json!({}), JobPriority::Normal, 3);
424
425 assert_eq!(job.job_type, "send_email");
426 assert_eq!(job.status, JobStatus::Pending);
427 assert_eq!(job.priority, 50);
428 assert_eq!(job.attempts, 0);
429 assert_eq!(job.max_attempts, 3);
430 }
431
432 #[test]
433 fn test_job_record_with_capability() {
434 let job = JobRecord::new("transcode", serde_json::json!({}), JobPriority::High, 3)
435 .with_capability("media");
436
437 assert_eq!(job.worker_capability, Some("media".to_string()));
438 assert_eq!(job.priority, 75);
439 }
440
441 #[test]
442 fn test_job_record_with_idempotency() {
443 let job = JobRecord::new("payment", serde_json::json!({}), JobPriority::Critical, 5)
444 .with_idempotency_key("payment-123");
445
446 assert_eq!(job.idempotency_key, Some("payment-123".to_string()));
447 }
448}