1use chrono::{DateTime, Utc};
4use graphile_worker::{Job, JobKeyMode};
5use serde::{Deserialize, Serialize};
6use sqlx::Row;
7
8use super::BackfillClient;
9use crate::{BackfillError, JobSpec, Priority, Queue};
10
11#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct DlqJob {
17 pub id: i64,
19 pub original_job_id: Option<i64>,
21 pub task_identifier: String,
23 pub payload: serde_json::Value,
25 pub queue_name: String,
27 pub priority: i32,
29 pub job_key: Option<String>,
31 pub max_attempts: Option<i32>,
33 pub failure_reason: String,
35 pub failure_count: i32,
37 pub last_error: Option<serde_json::Value>,
39 pub original_created_at: Option<DateTime<Utc>>,
41 pub original_run_at: Option<DateTime<Utc>>,
43 pub failed_at: DateTime<Utc>,
45 pub requeued_count: i32,
47 pub last_requeued_at: Option<DateTime<Utc>>,
49 pub notes: Option<String>,
51}
52
53#[derive(Debug, Clone, Default)]
55pub struct DlqFilter {
56 pub task_identifier: Option<String>,
58 pub queue_name: Option<String>,
60 pub failed_after: Option<DateTime<Utc>>,
62 pub failed_before: Option<DateTime<Utc>>,
64 pub limit: Option<i32>,
66 pub offset: Option<i32>,
68}
69
70#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct DlqJobList {
73 pub jobs: Vec<DlqJob>,
75 pub total: u32,
77 pub offset: i32,
79 pub limit: i32,
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct DlqStats {
86 pub total_jobs: u32,
88 pub unique_tasks: u32,
90 pub unique_queues: u32,
92 pub avg_failure_count: f64,
94 pub total_requeued: u32,
96 pub oldest_failure: Option<DateTime<Utc>>,
98 pub newest_failure: Option<DateTime<Utc>>,
100 pub task_breakdown: Vec<(String, u32)>,
102}
103
104impl BackfillClient {
105 pub async fn init_dlq(&self) -> Result<(), BackfillError> {
114 let create_table_query = format!(
116 r#"
117 CREATE TABLE IF NOT EXISTS {}.backfill_dlq (
118 id BIGSERIAL PRIMARY KEY,
119 -- Original job information
120 original_job_id BIGINT,
121 task_identifier VARCHAR(200) NOT NULL,
122 payload JSONB NOT NULL,
123
124 -- Job specification when originally enqueued
125 queue_name VARCHAR(100) NOT NULL DEFAULT 'default',
126 priority INTEGER NOT NULL DEFAULT 0,
127 job_key VARCHAR(200),
128 max_attempts INTEGER,
129
130 -- Failure information
131 failure_reason TEXT NOT NULL,
132 failure_count INTEGER NOT NULL DEFAULT 1,
133 last_error JSONB,
134
135 -- Timestamps
136 original_created_at TIMESTAMPTZ,
137 original_run_at TIMESTAMPTZ,
138 failed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
139
140 -- Admin tracking
141 requeued_count INTEGER NOT NULL DEFAULT 0,
142 last_requeued_at TIMESTAMPTZ,
143 notes TEXT
144 )
145 "#,
146 self.schema
147 );
148
149 sqlx::query(&create_table_query).execute(&self.pool).await?;
150
151 let indexes = vec![
153 format!(
154 "CREATE INDEX IF NOT EXISTS idx_backfill_dlq_task_identifier ON {}.backfill_dlq (task_identifier)",
155 self.schema
156 ),
157 format!(
158 "CREATE INDEX IF NOT EXISTS idx_backfill_dlq_failed_at ON {}.backfill_dlq (failed_at DESC)",
159 self.schema
160 ),
161 format!(
162 "CREATE INDEX IF NOT EXISTS idx_backfill_dlq_queue_name ON {}.backfill_dlq (queue_name)",
163 self.schema
164 ),
165 format!(
166 "CREATE INDEX IF NOT EXISTS idx_backfill_dlq_job_key ON {}.backfill_dlq (job_key) WHERE job_key IS NOT NULL",
167 self.schema
168 ),
169 format!(
172 "CREATE UNIQUE INDEX IF NOT EXISTS idx_backfill_dlq_job_key_unique ON {}.backfill_dlq (job_key) WHERE job_key IS NOT NULL",
173 self.schema
174 ),
175 ];
176
177 for index_query in indexes {
178 sqlx::query(&index_query).execute(&self.pool).await?;
179 }
180
181 Ok(())
182 }
183
184 pub async fn list_dlq_jobs(&self, filter: DlqFilter) -> Result<DlqJobList, BackfillError> {
186 let mut query_builder = sqlx::QueryBuilder::new("SELECT ");
187 query_builder.push(
188 r#"
189 id, original_job_id, task_identifier, payload, queue_name, priority,
190 job_key, max_attempts, failure_reason, failure_count, last_error,
191 original_created_at, original_run_at, failed_at, requeued_count,
192 last_requeued_at, notes
193 "#,
194 );
195 query_builder
196 .push(" FROM ")
197 .push(&self.schema)
198 .push(".backfill_dlq WHERE 1=1");
199
200 if let Some(task) = &filter.task_identifier {
202 query_builder.push(" AND task_identifier = ").push_bind(task);
203 }
204 if let Some(queue) = &filter.queue_name {
205 query_builder.push(" AND queue_name = ").push_bind(queue);
206 }
207 if let Some(from) = filter.failed_after {
208 query_builder.push(" AND failed_at >= ").push_bind(from);
209 }
210 if let Some(to) = filter.failed_before {
211 query_builder.push(" AND failed_at <= ").push_bind(to);
212 }
213
214 query_builder.push(" ORDER BY failed_at DESC");
216 query_builder.push(" LIMIT ").push_bind(filter.limit.unwrap_or(50));
217 query_builder.push(" OFFSET ").push_bind(filter.offset.unwrap_or(0));
218
219 let rows = query_builder.build().fetch_all(&self.pool).await?;
220
221 let jobs: Vec<DlqJob> = rows
222 .into_iter()
223 .map(|row| DlqJob {
224 id: row.get("id"),
225 original_job_id: row.get("original_job_id"),
226 task_identifier: row.get("task_identifier"),
227 payload: row.get("payload"),
228 queue_name: row.get("queue_name"),
229 priority: row.get("priority"),
230 job_key: row.get("job_key"),
231 max_attempts: row.get("max_attempts"),
232 failure_reason: row.get("failure_reason"),
233 failure_count: row.get("failure_count"),
234 last_error: row.get("last_error"),
235 original_created_at: row.get("original_created_at"),
236 original_run_at: row.get("original_run_at"),
237 failed_at: row.get("failed_at"),
238 requeued_count: row.get("requeued_count"),
239 last_requeued_at: row.get("last_requeued_at"),
240 notes: row.get("notes"),
241 })
242 .collect();
243
244 let now = Utc::now();
246 for job in &jobs {
247 let age_seconds = (now - job.failed_at).num_seconds() as f64;
248 crate::metrics::record_dlq_age(&job.task_identifier, age_seconds);
249 }
250
251 let count_query = format!("SELECT COUNT(*) FROM {}.backfill_dlq", self.schema);
253 let total: i64 = sqlx::query_scalar(&count_query).fetch_one(&self.pool).await?;
254
255 Ok(DlqJobList {
256 jobs,
257 total: total as u32,
258 offset: filter.offset.unwrap_or(0),
259 limit: filter.limit.unwrap_or(50),
260 })
261 }
262
263 pub async fn get_dlq_job(&self, dlq_id: i64) -> Result<Option<DlqJob>, BackfillError> {
265 let query = format!(
266 r#"
267 SELECT id, original_job_id, task_identifier, payload, queue_name, priority,
268 job_key, max_attempts, failure_reason, failure_count, last_error,
269 original_created_at, original_run_at, failed_at, requeued_count,
270 last_requeued_at, notes
271 FROM {}.backfill_dlq
272 WHERE id = $1
273 "#,
274 self.schema
275 );
276
277 let row = sqlx::query(&query).bind(dlq_id).fetch_optional(&self.pool).await?;
278
279 Ok(row.map(|row| {
280 let job = DlqJob {
281 id: row.get("id"),
282 original_job_id: row.get("original_job_id"),
283 task_identifier: row.get("task_identifier"),
284 payload: row.get("payload"),
285 queue_name: row.get("queue_name"),
286 priority: row.get("priority"),
287 job_key: row.get("job_key"),
288 max_attempts: row.get("max_attempts"),
289 failure_reason: row.get("failure_reason"),
290 failure_count: row.get("failure_count"),
291 last_error: row.get("last_error"),
292 original_created_at: row.get("original_created_at"),
293 original_run_at: row.get("original_run_at"),
294 failed_at: row.get("failed_at"),
295 requeued_count: row.get("requeued_count"),
296 last_requeued_at: row.get("last_requeued_at"),
297 notes: row.get("notes"),
298 };
299
300 let age_seconds = (Utc::now() - job.failed_at).num_seconds() as f64;
302 crate::metrics::record_dlq_age(&job.task_identifier, age_seconds);
303
304 job
305 }))
306 }
307
308 pub async fn requeue_dlq_job(&self, dlq_id: i64, notes: Option<String>) -> Result<Job, BackfillError> {
310 let dlq_job = self
312 .get_dlq_job(dlq_id)
313 .await?
314 .ok_or_else(|| BackfillError::DlqJobNotFound(dlq_id))?;
315
316 let queue = if dlq_job.queue_name.is_empty() {
319 Queue::Parallel
320 } else {
321 Queue::Serial(dlq_job.queue_name.clone())
322 };
323
324 let spec = JobSpec {
325 run_at: None, priority: Priority(dlq_job.priority as i16),
327 queue,
328 max_attempts: dlq_job.max_attempts,
329 retry_policy: None, job_key: dlq_job.job_key.clone(),
331 job_key_mode: JobKeyMode::Replace,
332 };
333
334 let outcome = self
336 .enqueue(&dlq_job.task_identifier, &dlq_job.payload, spec.clone())
337 .await?;
338
339 let job = match outcome {
340 crate::EnqueueOutcome::Enqueued(job) => job,
341 crate::EnqueueOutcome::AlreadyInProgress { job_key } => {
342 return Err(BackfillError::RuntimeError(format!(
343 "Cannot requeue DLQ job {}: a job with key '{}' is already in progress",
344 dlq_id, job_key
345 )));
346 }
347 };
348
349 crate::metrics::record_dlq_job_requeued(&dlq_job.task_identifier, spec.queue.as_str());
351
352 log::info!(
353 "Job requeued from DLQ (dlq_id: {}, job_id: {}, task: {})",
354 dlq_id,
355 job.id(),
356 dlq_job.task_identifier
357 );
358
359 let update_query = format!(
361 r#"
362 UPDATE {}.backfill_dlq
363 SET requeued_count = requeued_count + 1,
364 last_requeued_at = NOW(),
365 notes = $1
366 WHERE id = $2
367 "#,
368 self.schema
369 );
370
371 sqlx::query(&update_query)
372 .bind(¬es)
373 .bind(dlq_id)
374 .execute(&self.pool)
375 .await?;
376
377 Ok(*job)
378 }
379
380 pub async fn delete_dlq_job(&self, dlq_id: i64) -> Result<bool, BackfillError> {
382 let task_identifier = if let Some(job) = self.get_dlq_job(dlq_id).await? {
384 Some(job.task_identifier.clone())
385 } else {
386 None
387 };
388
389 let query = format!("DELETE FROM {}.backfill_dlq WHERE id = $1", self.schema);
390 let result = sqlx::query(&query).bind(dlq_id).execute(&self.pool).await?;
391
392 let deleted = result.rows_affected() > 0;
393
394 if deleted && let Some(task) = task_identifier {
395 crate::metrics::record_dlq_job_deleted(&task);
396 log::info!("Job deleted from DLQ (dlq_id: {}, task: {})", dlq_id, task);
397 }
398
399 Ok(deleted)
400 }
401
402 pub async fn delete_dlq_by_job_key(&self, job_key: &str) -> Result<u64, BackfillError> {
410 let query = format!("DELETE FROM {}.backfill_dlq WHERE job_key = $1", self.schema);
411 let result = sqlx::query(&query).bind(job_key).execute(&self.pool).await?;
412
413 let deleted = result.rows_affected();
414
415 if deleted > 0 {
416 crate::metrics::record_dlq_job_deleted("(by_job_key)");
417 log::info!(
418 "DLQ entries deleted by job_key (job_key: {}, count: {})",
419 job_key,
420 deleted
421 );
422 }
423
424 Ok(deleted)
425 }
426
427 pub async fn dlq_stats(&self) -> Result<DlqStats, BackfillError> {
429 let query = format!(
430 r#"
431 SELECT
432 COUNT(*) as total_jobs,
433 COUNT(DISTINCT task_identifier) as unique_tasks,
434 COUNT(DISTINCT queue_name) as unique_queues,
435 COALESCE(AVG(failure_count)::FLOAT8, 0) as avg_failure_count,
436 COALESCE(SUM(requeued_count), 0) as total_requeued,
437 MIN(failed_at) as oldest_failure,
438 MAX(failed_at) as newest_failure
439 FROM {}.backfill_dlq
440 "#,
441 self.schema
442 );
443
444 let row = sqlx::query(&query).fetch_one(&self.pool).await?;
445
446 let task_breakdown_query = format!(
447 r#"
448 SELECT task_identifier, COUNT(*) as count
449 FROM {}.backfill_dlq
450 GROUP BY task_identifier
451 ORDER BY count DESC
452 LIMIT 10
453 "#,
454 self.schema
455 );
456
457 let task_rows = sqlx::query(&task_breakdown_query).fetch_all(&self.pool).await?;
458
459 let task_breakdown: Vec<(String, u32)> = task_rows
460 .into_iter()
461 .map(|row| (row.get("task_identifier"), row.get::<i64, _>("count") as u32))
462 .collect();
463
464 let stats = DlqStats {
465 total_jobs: row.get::<i64, _>("total_jobs") as u32,
466 unique_tasks: row.get::<i64, _>("unique_tasks") as u32,
467 unique_queues: row.get::<i64, _>("unique_queues") as u32,
468 avg_failure_count: row.get::<Option<f64>, _>("avg_failure_count").unwrap_or(0.0),
469 total_requeued: row.get::<i64, _>("total_requeued") as u32,
470 oldest_failure: row.get("oldest_failure"),
471 newest_failure: row.get("newest_failure"),
472 task_breakdown: task_breakdown.clone(),
473 };
474
475 crate::metrics::update_dlq_size(stats.total_jobs);
477
478 for (task, count) in &task_breakdown {
480 crate::metrics::update_dlq_size_by_task(task, *count);
481 }
482
483 Ok(stats)
484 }
485
486 pub async fn add_to_dlq(
489 &self,
490 original_job: &Job,
491 failure_reason: &str,
492 last_error: Option<serde_json::Value>,
493 ) -> Result<DlqJob, BackfillError> {
494 let start = std::time::Instant::now();
495
496 let queue_name = if let Some(queue_id) = original_job.job_queue_id() {
500 let query = format!(
501 "SELECT queue_name FROM {}._private_job_queues WHERE id = $1",
502 self.schema
503 );
504 sqlx::query_scalar::<_, String>(&query)
505 .bind(queue_id)
506 .fetch_optional(&self.pool)
507 .await?
508 .unwrap_or_else(|| "default".to_string())
509 } else {
510 String::new()
513 };
514
515 let upsert_query = format!(
520 r#"
521 INSERT INTO {}.backfill_dlq (
522 original_job_id, task_identifier, payload, queue_name, priority,
523 job_key, max_attempts, failure_reason, failure_count, last_error,
524 original_created_at, original_run_at
525 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
526 ON CONFLICT (job_key) WHERE job_key IS NOT NULL DO UPDATE SET
527 failed_at = NOW(),
528 failure_count = {schema}.backfill_dlq.failure_count + EXCLUDED.failure_count,
529 failure_reason = EXCLUDED.failure_reason,
530 last_error = EXCLUDED.last_error,
531 original_job_id = EXCLUDED.original_job_id
532 RETURNING *
533 "#,
534 self.schema,
535 schema = self.schema
536 );
537
538 let row = sqlx::query(&upsert_query)
539 .bind(original_job.id())
540 .bind(original_job.task_identifier())
541 .bind(original_job.payload())
542 .bind(queue_name)
543 .bind(*original_job.priority())
544 .bind(original_job.key())
545 .bind(original_job.max_attempts())
546 .bind(failure_reason)
547 .bind(original_job.attempts())
548 .bind(&last_error)
549 .bind(original_job.created_at())
550 .bind(original_job.run_at())
551 .fetch_one(&self.pool)
552 .await?;
553
554 let dlq_job = DlqJob {
555 id: row.get("id"),
556 original_job_id: row.get("original_job_id"),
557 task_identifier: row.get("task_identifier"),
558 payload: row.get("payload"),
559 queue_name: row.get("queue_name"),
560 priority: row.get("priority"),
561 job_key: row.get("job_key"),
562 max_attempts: row.get("max_attempts"),
563 failure_reason: row.get("failure_reason"),
564 failure_count: row.get("failure_count"),
565 last_error: row.get("last_error"),
566 original_created_at: row.get("original_created_at"),
567 original_run_at: row.get("original_run_at"),
568 failed_at: row.get("failed_at"),
569 requeued_count: row.get("requeued_count"),
570 last_requeued_at: row.get("last_requeued_at"),
571 notes: row.get("notes"),
572 };
573
574 crate::metrics::record_db_operation("dlq_add", "success");
576 crate::metrics::record_db_operation_duration("dlq_add", start.elapsed().as_secs_f64());
577 crate::metrics::record_dlq_job_added(&dlq_job.queue_name, &dlq_job.task_identifier, &dlq_job.failure_reason);
578
579 log::info!(
580 "Job moved to DLQ (dlq_id: {}, task: {}, failure_reason: {})",
581 dlq_job.id,
582 dlq_job.task_identifier,
583 dlq_job.failure_reason
584 );
585
586 Ok(dlq_job)
587 }
588
589 pub async fn process_failed_jobs(&self) -> Result<u32, BackfillError> {
596 let find_failed_jobs_query = format!(
599 r#"
600 SELECT jobs.id, tasks.identifier AS task_identifier,
601 job_queues.queue_name, jobs.priority, jobs.key as job_key,
602 jobs.max_attempts, jobs.attempts, jobs.last_error,
603 jobs.created_at, jobs.run_at, jobs.updated_at, jobs.payload
604 FROM {}._private_jobs AS jobs
605 INNER JOIN {}._private_tasks AS tasks ON tasks.id = jobs.task_id
606 LEFT JOIN {}._private_job_queues AS job_queues ON job_queues.id = jobs.job_queue_id
607 WHERE jobs.attempts >= jobs.max_attempts
608 AND jobs.max_attempts > 0
609 AND jobs.id NOT IN (SELECT COALESCE(original_job_id, -1) FROM {}.backfill_dlq)
610 ORDER BY jobs.updated_at ASC
611 LIMIT 100
612 "#,
613 self.schema, self.schema, self.schema, self.schema
614 );
615
616 let failed_jobs = sqlx::query(&find_failed_jobs_query).fetch_all(&self.pool).await?;
617
618 let mut moved_count = 0;
619
620 for job_row in failed_jobs {
621 let job_id: i64 = job_row.get("id");
622 let task_identifier: String = job_row.get("task_identifier");
623 let payload: serde_json::Value = job_row.get("payload");
624 let queue_name: Option<String> = job_row.get("queue_name");
625 let queue_name = queue_name.unwrap_or_default();
628 let priority: i16 = job_row.get("priority");
629 let job_key: Option<String> = job_row.get("job_key");
630 let max_attempts: i16 = job_row.get("max_attempts");
631 let attempts: i16 = job_row.get("attempts");
632 let last_error: Option<String> = job_row.get("last_error");
633 let created_at: chrono::DateTime<chrono::Utc> = job_row.get("created_at");
634 let run_at: chrono::DateTime<chrono::Utc> = job_row.get("run_at");
635
636 let last_error_json = last_error.map(serde_json::Value::String);
638
639 let upsert_dlq_query = format!(
641 r#"
642 INSERT INTO {schema}.backfill_dlq (
643 original_job_id, task_identifier, payload, queue_name, priority,
644 job_key, max_attempts, failure_reason, failure_count, last_error,
645 original_created_at, original_run_at
646 ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
647 ON CONFLICT (job_key) WHERE job_key IS NOT NULL DO UPDATE SET
648 failed_at = NOW(),
649 failure_count = {schema}.backfill_dlq.failure_count + EXCLUDED.failure_count,
650 failure_reason = EXCLUDED.failure_reason,
651 last_error = EXCLUDED.last_error,
652 original_job_id = EXCLUDED.original_job_id
653 "#,
654 schema = self.schema
655 );
656
657 let failure_reason = format!("Job exceeded maximum retry attempts ({}/{})", attempts, max_attempts);
658
659 let upsert_result = sqlx::query(&upsert_dlq_query)
660 .bind(job_id)
661 .bind(&task_identifier)
662 .bind(&payload)
663 .bind(&queue_name)
664 .bind(priority)
665 .bind(&job_key)
666 .bind(max_attempts as i32)
667 .bind(failure_reason)
668 .bind(attempts as i32)
669 .bind(&last_error_json)
670 .bind(created_at)
671 .bind(run_at)
672 .execute(&self.pool)
673 .await;
674
675 match upsert_result {
676 Ok(_) => {
677 let delete_query = format!("DELETE FROM {}._private_jobs WHERE id = $1", self.schema);
679 match sqlx::query(&delete_query).bind(job_id).execute(&self.pool).await {
680 Ok(_) => {
681 moved_count += 1;
682 log::info!(
683 "Successfully moved failed job to DLQ (job_id: {}, task: {}, attempts: {}/{})",
684 job_id,
685 task_identifier,
686 attempts,
687 max_attempts
688 );
689 }
690 Err(e) => {
691 log::error!(
692 "Failed to delete job from main table after DLQ insertion (job_id: {}, task: {}, error: {})",
693 job_id,
694 task_identifier,
695 e
696 );
697 }
700 }
701 }
702 Err(e) => {
703 log::error!(
704 "Failed to insert job into DLQ (job_id: {}, task: {}, error: {})",
705 job_id,
706 task_identifier,
707 e
708 );
709 }
710 }
711 }
712
713 if moved_count > 0 {
714 log::info!("DLQ processing completed (moved_count: {})", moved_count);
715 }
716
717 Ok(moved_count)
718 }
719
720 pub fn start_dlq_processor(
734 &self,
735 interval: std::time::Duration,
736 cancellation_token: tokio_util::sync::CancellationToken,
737 ) -> tokio::task::JoinHandle<()> {
738 let client = self.clone();
739
740 tokio::spawn(async move {
741 let mut interval_timer = tokio::time::interval(interval);
742 interval_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
743
744 log::info!(
745 "Starting DLQ processor background task (interval_seconds: {})",
746 interval.as_secs()
747 );
748
749 loop {
750 tokio::select! {
751 _ = cancellation_token.cancelled() => {
752 log::info!("DLQ processor shutting down");
753 break;
754 }
755 _ = interval_timer.tick() => {
756 match client.process_failed_jobs().await {
757 Ok(count) if count > 0 => {
758 log::info!("DLQ processor moved failed jobs (moved_jobs: {})", count);
759 }
760 Ok(_) => {
761 }
763 Err(e) => {
764 log::error!("DLQ processor encountered error: {}", e);
765 }
766 }
767 }
768 }
769 }
770 })
771 }
772}