Skip to main content

backfill/client/
dlq.rs

1//! Dead letter queue types
2
3use chrono::{DateTime, Utc};
4use graphile_worker::{Job, JobKeyMode};
5use serde::{Deserialize, Serialize};
6use sqlx::Row;
7
8use super::BackfillClient;
9use crate::{BackfillError, JobSpec, Priority, Queue};
10
11// === Dead Letter Queue Types ===
12
13/// A job that has been moved to the dead letter queue after failing
14/// permanently.
15#[derive(Debug, Clone, Serialize, Deserialize)]
16pub struct DlqJob {
17    /// Unique DLQ entry ID
18    pub id: i64,
19    /// Original job ID from the main queue
20    pub original_job_id: Option<i64>,
21    /// Task identifier for the failed job
22    pub task_identifier: String,
23    /// Job payload as JSON
24    pub payload: serde_json::Value,
25    /// Queue name where the job originally ran
26    pub queue_name: String,
27    /// Job priority when originally enqueued
28    pub priority: i32,
29    /// Job key for deduplication (if any)
30    pub job_key: Option<String>,
31    /// Maximum retry attempts allowed
32    pub max_attempts: Option<i32>,
33    /// Human-readable failure reason
34    pub failure_reason: String,
35    /// Number of times the job failed
36    pub failure_count: i32,
37    /// Last error details as JSON
38    pub last_error: Option<serde_json::Value>,
39    /// When the original job was created
40    pub original_created_at: Option<DateTime<Utc>>,
41    /// When the original job was scheduled to run
42    pub original_run_at: Option<DateTime<Utc>>,
43    /// When the job was moved to DLQ
44    pub failed_at: DateTime<Utc>,
45    /// How many times this job has been requeued from DLQ
46    pub requeued_count: i32,
47    /// When this job was last requeued from DLQ
48    pub last_requeued_at: Option<DateTime<Utc>>,
49    /// Admin notes about this failure
50    pub notes: Option<String>,
51}
52
53/// Filter parameters for querying DLQ jobs.
54#[derive(Debug, Clone, Default)]
55pub struct DlqFilter {
56    /// Filter by task identifier
57    pub task_identifier: Option<String>,
58    /// Filter by queue name
59    pub queue_name: Option<String>,
60    /// Only jobs that failed after this time
61    pub failed_after: Option<DateTime<Utc>>,
62    /// Only jobs that failed before this time
63    pub failed_before: Option<DateTime<Utc>>,
64    /// Maximum number of results to return
65    pub limit: Option<i32>,
66    /// Offset for pagination
67    pub offset: Option<i32>,
68}
69
70/// Paginated list of DLQ jobs.
71#[derive(Debug, Clone, Serialize, Deserialize)]
72pub struct DlqJobList {
73    /// The jobs in this page
74    pub jobs: Vec<DlqJob>,
75    /// Total number of jobs matching the filter
76    pub total: u32,
77    /// Offset of this page
78    pub offset: i32,
79    /// Limit used for this page
80    pub limit: i32,
81}
82
83/// Statistics about the dead letter queue.
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct DlqStats {
86    /// Total number of jobs in the DLQ
87    pub total_jobs: u32,
88    /// Number of unique task types in the DLQ
89    pub unique_tasks: u32,
90    /// Number of unique queues represented in the DLQ
91    pub unique_queues: u32,
92    /// Average number of failures per job
93    pub avg_failure_count: f64,
94    /// Total number of requeue operations performed
95    pub total_requeued: u32,
96    /// Timestamp of the oldest failure
97    pub oldest_failure: Option<DateTime<Utc>>,
98    /// Timestamp of the newest failure
99    pub newest_failure: Option<DateTime<Utc>>,
100    /// Breakdown of jobs by task type (top 10)
101    pub task_breakdown: Vec<(String, u32)>,
102}
103
104impl BackfillClient {
105    /// Initialize the DLQ table if it doesn't exist.
106    /// This creates the necessary schema for dead letter queue functionality.
107    ///
108    /// For production environments with controlled migrations, consider using
109    /// the SQL migration files instead: `docs/dlq_schema.sql` contains the
110    /// complete schema definition that can be run with psql or your
111    /// preferred migration tool. See `docs/DLQ_MIGRATIONS.md` for detailed
112    /// instructions.
113    pub async fn init_dlq(&self) -> Result<(), BackfillError> {
114        // Create the table first
115        let create_table_query = format!(
116            r#"
117            CREATE TABLE IF NOT EXISTS {}.backfill_dlq (
118                id BIGSERIAL PRIMARY KEY,
119                -- Original job information
120                original_job_id BIGINT,
121                task_identifier VARCHAR(200) NOT NULL,
122                payload JSONB NOT NULL,
123
124                -- Job specification when originally enqueued
125                queue_name VARCHAR(100) NOT NULL DEFAULT 'default',
126                priority INTEGER NOT NULL DEFAULT 0,
127                job_key VARCHAR(200),
128                max_attempts INTEGER,
129
130                -- Failure information
131                failure_reason TEXT NOT NULL,
132                failure_count INTEGER NOT NULL DEFAULT 1,
133                last_error JSONB,
134
135                -- Timestamps
136                original_created_at TIMESTAMPTZ,
137                original_run_at TIMESTAMPTZ,
138                failed_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
139
140                -- Admin tracking
141                requeued_count INTEGER NOT NULL DEFAULT 0,
142                last_requeued_at TIMESTAMPTZ,
143                notes TEXT
144            )
145        "#,
146            self.schema
147        );
148
149        sqlx::query(&create_table_query).execute(&self.pool).await?;
150
151        // Create indexes separately
152        let indexes = vec![
153            format!(
154                "CREATE INDEX IF NOT EXISTS idx_backfill_dlq_task_identifier ON {}.backfill_dlq (task_identifier)",
155                self.schema
156            ),
157            format!(
158                "CREATE INDEX IF NOT EXISTS idx_backfill_dlq_failed_at ON {}.backfill_dlq (failed_at DESC)",
159                self.schema
160            ),
161            format!(
162                "CREATE INDEX IF NOT EXISTS idx_backfill_dlq_queue_name ON {}.backfill_dlq (queue_name)",
163                self.schema
164            ),
165            format!(
166                "CREATE INDEX IF NOT EXISTS idx_backfill_dlq_job_key ON {}.backfill_dlq (job_key) WHERE job_key IS NOT NULL",
167                self.schema
168            ),
169            // Unique constraint on job_key for UPSERT support - prevents duplicate DLQ entries
170            // when a requeued job fails again. Only applies to non-NULL job_keys.
171            format!(
172                "CREATE UNIQUE INDEX IF NOT EXISTS idx_backfill_dlq_job_key_unique ON {}.backfill_dlq (job_key) WHERE job_key IS NOT NULL",
173                self.schema
174            ),
175        ];
176
177        for index_query in indexes {
178            sqlx::query(&index_query).execute(&self.pool).await?;
179        }
180
181        Ok(())
182    }
183
184    /// List jobs in the dead letter queue with pagination and filtering.
185    pub async fn list_dlq_jobs(&self, filter: DlqFilter) -> Result<DlqJobList, BackfillError> {
186        let mut query_builder = sqlx::QueryBuilder::new("SELECT ");
187        query_builder.push(
188            r#"
189            id, original_job_id, task_identifier, payload, queue_name, priority,
190            job_key, max_attempts, failure_reason, failure_count, last_error,
191            original_created_at, original_run_at, failed_at, requeued_count,
192            last_requeued_at, notes
193        "#,
194        );
195        query_builder
196            .push(" FROM ")
197            .push(&self.schema)
198            .push(".backfill_dlq WHERE 1=1");
199
200        // Apply filters
201        if let Some(task) = &filter.task_identifier {
202            query_builder.push(" AND task_identifier = ").push_bind(task);
203        }
204        if let Some(queue) = &filter.queue_name {
205            query_builder.push(" AND queue_name = ").push_bind(queue);
206        }
207        if let Some(from) = filter.failed_after {
208            query_builder.push(" AND failed_at >= ").push_bind(from);
209        }
210        if let Some(to) = filter.failed_before {
211            query_builder.push(" AND failed_at <= ").push_bind(to);
212        }
213
214        // Order and pagination
215        query_builder.push(" ORDER BY failed_at DESC");
216        query_builder.push(" LIMIT ").push_bind(filter.limit.unwrap_or(50));
217        query_builder.push(" OFFSET ").push_bind(filter.offset.unwrap_or(0));
218
219        let rows = query_builder.build().fetch_all(&self.pool).await?;
220
221        let jobs: Vec<DlqJob> = rows
222            .into_iter()
223            .map(|row| DlqJob {
224                id: row.get("id"),
225                original_job_id: row.get("original_job_id"),
226                task_identifier: row.get("task_identifier"),
227                payload: row.get("payload"),
228                queue_name: row.get("queue_name"),
229                priority: row.get("priority"),
230                job_key: row.get("job_key"),
231                max_attempts: row.get("max_attempts"),
232                failure_reason: row.get("failure_reason"),
233                failure_count: row.get("failure_count"),
234                last_error: row.get("last_error"),
235                original_created_at: row.get("original_created_at"),
236                original_run_at: row.get("original_run_at"),
237                failed_at: row.get("failed_at"),
238                requeued_count: row.get("requeued_count"),
239                last_requeued_at: row.get("last_requeued_at"),
240                notes: row.get("notes"),
241            })
242            .collect();
243
244        // Record DLQ age metrics for monitoring
245        let now = Utc::now();
246        for job in &jobs {
247            let age_seconds = (now - job.failed_at).num_seconds() as f64;
248            crate::metrics::record_dlq_age(&job.task_identifier, age_seconds);
249        }
250
251        // Get total count for pagination (simplified - could be optimized)
252        let count_query = format!("SELECT COUNT(*) FROM {}.backfill_dlq", self.schema);
253        let total: i64 = sqlx::query_scalar(&count_query).fetch_one(&self.pool).await?;
254
255        Ok(DlqJobList {
256            jobs,
257            total: total as u32,
258            offset: filter.offset.unwrap_or(0),
259            limit: filter.limit.unwrap_or(50),
260        })
261    }
262
263    /// Get a single DLQ job by ID.
264    pub async fn get_dlq_job(&self, dlq_id: i64) -> Result<Option<DlqJob>, BackfillError> {
265        let query = format!(
266            r#"
267            SELECT id, original_job_id, task_identifier, payload, queue_name, priority,
268                   job_key, max_attempts, failure_reason, failure_count, last_error,
269                   original_created_at, original_run_at, failed_at, requeued_count,
270                   last_requeued_at, notes
271            FROM {}.backfill_dlq
272            WHERE id = $1
273        "#,
274            self.schema
275        );
276
277        let row = sqlx::query(&query).bind(dlq_id).fetch_optional(&self.pool).await?;
278
279        Ok(row.map(|row| {
280            let job = DlqJob {
281                id: row.get("id"),
282                original_job_id: row.get("original_job_id"),
283                task_identifier: row.get("task_identifier"),
284                payload: row.get("payload"),
285                queue_name: row.get("queue_name"),
286                priority: row.get("priority"),
287                job_key: row.get("job_key"),
288                max_attempts: row.get("max_attempts"),
289                failure_reason: row.get("failure_reason"),
290                failure_count: row.get("failure_count"),
291                last_error: row.get("last_error"),
292                original_created_at: row.get("original_created_at"),
293                original_run_at: row.get("original_run_at"),
294                failed_at: row.get("failed_at"),
295                requeued_count: row.get("requeued_count"),
296                last_requeued_at: row.get("last_requeued_at"),
297                notes: row.get("notes"),
298            };
299
300            // Record DLQ age metric
301            let age_seconds = (Utc::now() - job.failed_at).num_seconds() as f64;
302            crate::metrics::record_dlq_age(&job.task_identifier, age_seconds);
303
304            job
305        }))
306    }
307
308    /// Requeue a job from the DLQ back to the main queue.
309    pub async fn requeue_dlq_job(&self, dlq_id: i64, notes: Option<String>) -> Result<Job, BackfillError> {
310        // Get the DLQ job
311        let dlq_job = self
312            .get_dlq_job(dlq_id)
313            .await?
314            .ok_or_else(|| BackfillError::DlqJobNotFound(dlq_id))?;
315
316        // Create job spec from DLQ job data
317        // Empty queue name means parallel execution; non-empty means serial
318        let queue = if dlq_job.queue_name.is_empty() {
319            Queue::Parallel
320        } else {
321            Queue::Serial(dlq_job.queue_name.clone())
322        };
323
324        let spec = JobSpec {
325            run_at: None, // Run immediately
326            priority: Priority(dlq_job.priority as i16),
327            queue,
328            max_attempts: dlq_job.max_attempts,
329            retry_policy: None, // Use default retry policy
330            job_key: dlq_job.job_key.clone(),
331            job_key_mode: JobKeyMode::Replace,
332        };
333
334        // Enqueue the job
335        let outcome = self
336            .enqueue(&dlq_job.task_identifier, &dlq_job.payload, spec.clone())
337            .await?;
338
339        let job = match outcome {
340            crate::EnqueueOutcome::Enqueued(job) => job,
341            crate::EnqueueOutcome::AlreadyInProgress { job_key } => {
342                return Err(BackfillError::RuntimeError(format!(
343                    "Cannot requeue DLQ job {}: a job with key '{}' is already in progress",
344                    dlq_id, job_key
345                )));
346            }
347        };
348
349        // Record metrics
350        crate::metrics::record_dlq_job_requeued(&dlq_job.task_identifier, spec.queue.as_str());
351
352        log::info!(
353            "Job requeued from DLQ (dlq_id: {}, job_id: {}, task: {})",
354            dlq_id,
355            job.id(),
356            dlq_job.task_identifier
357        );
358
359        // Update the DLQ record
360        let update_query = format!(
361            r#"
362            UPDATE {}.backfill_dlq
363            SET requeued_count = requeued_count + 1,
364                last_requeued_at = NOW(),
365                notes = $1
366            WHERE id = $2
367        "#,
368            self.schema
369        );
370
371        sqlx::query(&update_query)
372            .bind(&notes)
373            .bind(dlq_id)
374            .execute(&self.pool)
375            .await?;
376
377        Ok(*job)
378    }
379
380    /// Delete a job from the DLQ permanently.
381    pub async fn delete_dlq_job(&self, dlq_id: i64) -> Result<bool, BackfillError> {
382        // Get the job first to record task identifier in metrics
383        let task_identifier = if let Some(job) = self.get_dlq_job(dlq_id).await? {
384            Some(job.task_identifier.clone())
385        } else {
386            None
387        };
388
389        let query = format!("DELETE FROM {}.backfill_dlq WHERE id = $1", self.schema);
390        let result = sqlx::query(&query).bind(dlq_id).execute(&self.pool).await?;
391
392        let deleted = result.rows_affected() > 0;
393
394        if deleted && let Some(task) = task_identifier {
395            crate::metrics::record_dlq_job_deleted(&task);
396            log::info!("Job deleted from DLQ (dlq_id: {}, task: {})", dlq_id, task);
397        }
398
399        Ok(deleted)
400    }
401
402    /// Delete DLQ entries by job_key.
403    ///
404    /// This is called when a job with a job_key completes successfully to clean
405    /// up any associated DLQ entries. This prevents requeued jobs from being
406    /// requeued again after they succeed.
407    ///
408    /// Returns the number of entries deleted.
409    pub async fn delete_dlq_by_job_key(&self, job_key: &str) -> Result<u64, BackfillError> {
410        let query = format!("DELETE FROM {}.backfill_dlq WHERE job_key = $1", self.schema);
411        let result = sqlx::query(&query).bind(job_key).execute(&self.pool).await?;
412
413        let deleted = result.rows_affected();
414
415        if deleted > 0 {
416            crate::metrics::record_dlq_job_deleted("(by_job_key)");
417            log::info!(
418                "DLQ entries deleted by job_key (job_key: {}, count: {})",
419                job_key,
420                deleted
421            );
422        }
423
424        Ok(deleted)
425    }
426
427    /// Get DLQ statistics for monitoring and dashboards.
428    pub async fn dlq_stats(&self) -> Result<DlqStats, BackfillError> {
429        let query = format!(
430            r#"
431            SELECT
432                COUNT(*) as total_jobs,
433                COUNT(DISTINCT task_identifier) as unique_tasks,
434                COUNT(DISTINCT queue_name) as unique_queues,
435                COALESCE(AVG(failure_count)::FLOAT8, 0) as avg_failure_count,
436                COALESCE(SUM(requeued_count), 0) as total_requeued,
437                MIN(failed_at) as oldest_failure,
438                MAX(failed_at) as newest_failure
439            FROM {}.backfill_dlq
440        "#,
441            self.schema
442        );
443
444        let row = sqlx::query(&query).fetch_one(&self.pool).await?;
445
446        let task_breakdown_query = format!(
447            r#"
448            SELECT task_identifier, COUNT(*) as count
449            FROM {}.backfill_dlq
450            GROUP BY task_identifier
451            ORDER BY count DESC
452            LIMIT 10
453        "#,
454            self.schema
455        );
456
457        let task_rows = sqlx::query(&task_breakdown_query).fetch_all(&self.pool).await?;
458
459        let task_breakdown: Vec<(String, u32)> = task_rows
460            .into_iter()
461            .map(|row| (row.get("task_identifier"), row.get::<i64, _>("count") as u32))
462            .collect();
463
464        let stats = DlqStats {
465            total_jobs: row.get::<i64, _>("total_jobs") as u32,
466            unique_tasks: row.get::<i64, _>("unique_tasks") as u32,
467            unique_queues: row.get::<i64, _>("unique_queues") as u32,
468            avg_failure_count: row.get::<Option<f64>, _>("avg_failure_count").unwrap_or(0.0),
469            total_requeued: row.get::<i64, _>("total_requeued") as u32,
470            oldest_failure: row.get("oldest_failure"),
471            newest_failure: row.get("newest_failure"),
472            task_breakdown: task_breakdown.clone(),
473        };
474
475        // Update DLQ size gauge metrics
476        crate::metrics::update_dlq_size(stats.total_jobs);
477
478        // Update per-task breakdown
479        for (task, count) in &task_breakdown {
480            crate::metrics::update_dlq_size_by_task(task, *count);
481        }
482
483        Ok(stats)
484    }
485
486    /// Add a job to the DLQ. This is typically called internally when a job
487    /// fails permanently after exhausting all retries.
488    pub async fn add_to_dlq(
489        &self,
490        original_job: &Job,
491        failure_reason: &str,
492        last_error: Option<serde_json::Value>,
493    ) -> Result<DlqJob, BackfillError> {
494        let start = std::time::Instant::now();
495
496        // Query queue_name from job_queue_id.
497        // For parallel jobs (no queue_id), use empty string so requeue preserves
498        // parallel execution. The requeue logic treats empty string as Queue::Parallel.
499        let queue_name = if let Some(queue_id) = original_job.job_queue_id() {
500            let query = format!(
501                "SELECT queue_name FROM {}._private_job_queues WHERE id = $1",
502                self.schema
503            );
504            sqlx::query_scalar::<_, String>(&query)
505                .bind(queue_id)
506                .fetch_optional(&self.pool)
507                .await?
508                .unwrap_or_else(|| "default".to_string())
509        } else {
510            // Parallel jobs have no queue_id - use empty string to preserve
511            // parallel execution when requeued (is_empty() check in requeue_dlq_job)
512            String::new()
513        };
514
515        // Use UPSERT to handle the case where a requeued job fails again.
516        // If a DLQ entry with the same job_key already exists, update it
517        // instead of creating a duplicate. This ensures one DLQ entry per
518        // logical job and keeps failed_at current for cooldown calculations.
519        let upsert_query = format!(
520            r#"
521            INSERT INTO {}.backfill_dlq (
522                original_job_id, task_identifier, payload, queue_name, priority,
523                job_key, max_attempts, failure_reason, failure_count, last_error,
524                original_created_at, original_run_at
525            ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
526            ON CONFLICT (job_key) WHERE job_key IS NOT NULL DO UPDATE SET
527                failed_at = NOW(),
528                failure_count = {schema}.backfill_dlq.failure_count + EXCLUDED.failure_count,
529                failure_reason = EXCLUDED.failure_reason,
530                last_error = EXCLUDED.last_error,
531                original_job_id = EXCLUDED.original_job_id
532            RETURNING *
533        "#,
534            self.schema,
535            schema = self.schema
536        );
537
538        let row = sqlx::query(&upsert_query)
539            .bind(original_job.id())
540            .bind(original_job.task_identifier())
541            .bind(original_job.payload())
542            .bind(queue_name)
543            .bind(*original_job.priority())
544            .bind(original_job.key())
545            .bind(original_job.max_attempts())
546            .bind(failure_reason)
547            .bind(original_job.attempts())
548            .bind(&last_error)
549            .bind(original_job.created_at())
550            .bind(original_job.run_at())
551            .fetch_one(&self.pool)
552            .await?;
553
554        let dlq_job = DlqJob {
555            id: row.get("id"),
556            original_job_id: row.get("original_job_id"),
557            task_identifier: row.get("task_identifier"),
558            payload: row.get("payload"),
559            queue_name: row.get("queue_name"),
560            priority: row.get("priority"),
561            job_key: row.get("job_key"),
562            max_attempts: row.get("max_attempts"),
563            failure_reason: row.get("failure_reason"),
564            failure_count: row.get("failure_count"),
565            last_error: row.get("last_error"),
566            original_created_at: row.get("original_created_at"),
567            original_run_at: row.get("original_run_at"),
568            failed_at: row.get("failed_at"),
569            requeued_count: row.get("requeued_count"),
570            last_requeued_at: row.get("last_requeued_at"),
571            notes: row.get("notes"),
572        };
573
574        // Record metrics
575        crate::metrics::record_db_operation("dlq_add", "success");
576        crate::metrics::record_db_operation_duration("dlq_add", start.elapsed().as_secs_f64());
577        crate::metrics::record_dlq_job_added(&dlq_job.queue_name, &dlq_job.task_identifier, &dlq_job.failure_reason);
578
579        log::info!(
580            "Job moved to DLQ (dlq_id: {}, task: {}, failure_reason: {})",
581            dlq_job.id,
582            dlq_job.task_identifier,
583            dlq_job.failure_reason
584        );
585
586        Ok(dlq_job)
587    }
588
589    /// Scan for permanently failed jobs and move them to the Dead Letter Queue.
590    ///
591    /// This method looks for jobs that have exhausted their retry attempts and
592    /// moves them to the DLQ for manual inspection and potential reprocessing.
593    ///
594    /// Returns the number of jobs moved to the DLQ.
595    pub async fn process_failed_jobs(&self) -> Result<u32, BackfillError> {
596        // Find jobs that have failed permanently (attempts >= max_attempts)
597        // and haven't been processed yet
598        let find_failed_jobs_query = format!(
599            r#"
600            SELECT jobs.id, tasks.identifier AS task_identifier,
601                   job_queues.queue_name, jobs.priority, jobs.key as job_key,
602                   jobs.max_attempts, jobs.attempts, jobs.last_error,
603                   jobs.created_at, jobs.run_at, jobs.updated_at, jobs.payload
604            FROM {}._private_jobs AS jobs
605            INNER JOIN {}._private_tasks AS tasks ON tasks.id = jobs.task_id
606            LEFT JOIN {}._private_job_queues AS job_queues ON job_queues.id = jobs.job_queue_id
607            WHERE jobs.attempts >= jobs.max_attempts
608              AND jobs.max_attempts > 0
609              AND jobs.id NOT IN (SELECT COALESCE(original_job_id, -1) FROM {}.backfill_dlq)
610            ORDER BY jobs.updated_at ASC
611            LIMIT 100
612        "#,
613            self.schema, self.schema, self.schema, self.schema
614        );
615
616        let failed_jobs = sqlx::query(&find_failed_jobs_query).fetch_all(&self.pool).await?;
617
618        let mut moved_count = 0;
619
620        for job_row in failed_jobs {
621            let job_id: i64 = job_row.get("id");
622            let task_identifier: String = job_row.get("task_identifier");
623            let payload: serde_json::Value = job_row.get("payload");
624            let queue_name: Option<String> = job_row.get("queue_name");
625            // Use empty string for parallel jobs (NULL queue_name) to preserve
626            // parallel execution when requeued
627            let queue_name = queue_name.unwrap_or_default();
628            let priority: i16 = job_row.get("priority");
629            let job_key: Option<String> = job_row.get("job_key");
630            let max_attempts: i16 = job_row.get("max_attempts");
631            let attempts: i16 = job_row.get("attempts");
632            let last_error: Option<String> = job_row.get("last_error");
633            let created_at: chrono::DateTime<chrono::Utc> = job_row.get("created_at");
634            let run_at: chrono::DateTime<chrono::Utc> = job_row.get("run_at");
635
636            // Convert last_error from TEXT to JSONB for DLQ table
637            let last_error_json = last_error.map(serde_json::Value::String);
638
639            // Move to DLQ using UPSERT to handle requeued jobs that fail again
640            let upsert_dlq_query = format!(
641                r#"
642                INSERT INTO {schema}.backfill_dlq (
643                    original_job_id, task_identifier, payload, queue_name, priority,
644                    job_key, max_attempts, failure_reason, failure_count, last_error,
645                    original_created_at, original_run_at
646                ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)
647                ON CONFLICT (job_key) WHERE job_key IS NOT NULL DO UPDATE SET
648                    failed_at = NOW(),
649                    failure_count = {schema}.backfill_dlq.failure_count + EXCLUDED.failure_count,
650                    failure_reason = EXCLUDED.failure_reason,
651                    last_error = EXCLUDED.last_error,
652                    original_job_id = EXCLUDED.original_job_id
653            "#,
654                schema = self.schema
655            );
656
657            let failure_reason = format!("Job exceeded maximum retry attempts ({}/{})", attempts, max_attempts);
658
659            let upsert_result = sqlx::query(&upsert_dlq_query)
660                .bind(job_id)
661                .bind(&task_identifier)
662                .bind(&payload)
663                .bind(&queue_name)
664                .bind(priority)
665                .bind(&job_key)
666                .bind(max_attempts as i32)
667                .bind(failure_reason)
668                .bind(attempts as i32)
669                .bind(&last_error_json)
670                .bind(created_at)
671                .bind(run_at)
672                .execute(&self.pool)
673                .await;
674
675            match upsert_result {
676                Ok(_) => {
677                    // Successfully moved to DLQ, now remove from main jobs table
678                    let delete_query = format!("DELETE FROM {}._private_jobs WHERE id = $1", self.schema);
679                    match sqlx::query(&delete_query).bind(job_id).execute(&self.pool).await {
680                        Ok(_) => {
681                            moved_count += 1;
682                            log::info!(
683                                "Successfully moved failed job to DLQ (job_id: {}, task: {}, attempts: {}/{})",
684                                job_id,
685                                task_identifier,
686                                attempts,
687                                max_attempts
688                            );
689                        }
690                        Err(e) => {
691                            log::error!(
692                                "Failed to delete job from main table after DLQ insertion (job_id: {}, task: {}, error: {})",
693                                job_id,
694                                task_identifier,
695                                e
696                            );
697                            // Consider this a partial failure - job is in DLQ
698                            // but also still in main table
699                        }
700                    }
701                }
702                Err(e) => {
703                    log::error!(
704                        "Failed to insert job into DLQ (job_id: {}, task: {}, error: {})",
705                        job_id,
706                        task_identifier,
707                        e
708                    );
709                }
710            }
711        }
712
713        if moved_count > 0 {
714            log::info!("DLQ processing completed (moved_count: {})", moved_count);
715        }
716
717        Ok(moved_count)
718    }
719
720    /// Process failed jobs continuously in a background task.
721    ///
722    /// This spawns a background task that periodically scans for failed jobs
723    /// and moves them to the DLQ. The task runs until the provided cancellation
724    /// token is triggered.
725    ///
726    /// # Arguments
727    /// * `interval` - How often to scan for failed jobs
728    /// * `cancellation_token` - Token to signal when to stop the background
729    ///   task
730    ///
731    /// # Returns
732    /// A JoinHandle for the background task
733    pub fn start_dlq_processor(
734        &self,
735        interval: std::time::Duration,
736        cancellation_token: tokio_util::sync::CancellationToken,
737    ) -> tokio::task::JoinHandle<()> {
738        let client = self.clone();
739
740        tokio::spawn(async move {
741            let mut interval_timer = tokio::time::interval(interval);
742            interval_timer.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip);
743
744            log::info!(
745                "Starting DLQ processor background task (interval_seconds: {})",
746                interval.as_secs()
747            );
748
749            loop {
750                tokio::select! {
751                    _ = cancellation_token.cancelled() => {
752                        log::info!("DLQ processor shutting down");
753                        break;
754                    }
755                    _ = interval_timer.tick() => {
756                        match client.process_failed_jobs().await {
757                            Ok(count) if count > 0 => {
758                                log::info!("DLQ processor moved failed jobs (moved_jobs: {})", count);
759                            }
760                            Ok(_) => {
761                                // No jobs moved, no need to log
762                            }
763                            Err(e) => {
764                                log::error!("DLQ processor encountered error: {}", e);
765                            }
766                        }
767                    }
768                }
769            }
770        })
771    }
772}