Skip to main content

awa_worker/
executor.rs

1use crate::context::JobContext;
2use awa_model::{AwaError, JobRow};
3use sqlx::PgPool;
4use std::any::Any;
5use std::collections::HashMap;
6use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
7use std::sync::Arc;
8use tokio::sync::RwLock;
9use tracing::{error, info, info_span, warn, Instrument};
10
11/// Result of executing a job handler.
12#[derive(Debug)]
13pub enum JobResult {
14    /// Job completed successfully.
15    Completed,
16    /// Job should be retried after the given duration. Increments attempt.
17    RetryAfter(std::time::Duration),
18    /// Job should be snoozed (re-available after duration). Does NOT increment attempt.
19    Snooze(std::time::Duration),
20    /// Job should be cancelled.
21    Cancel(String),
22}
23
24/// Error type for job handlers — any error is retryable unless it's terminal.
25#[derive(Debug, thiserror::Error)]
26pub enum JobError {
27    /// Retryable error — will be retried if attempts remain.
28    #[error("{0}")]
29    Retryable(#[source] Box<dyn std::error::Error + Send + Sync>),
30
31    /// Terminal error — immediately fails the job regardless of remaining attempts.
32    #[error("terminal: {0}")]
33    Terminal(String),
34}
35
36impl JobError {
37    pub fn retryable(err: impl std::error::Error + Send + Sync + 'static) -> Self {
38        JobError::Retryable(Box::new(err))
39    }
40
41    pub fn terminal(msg: impl Into<String>) -> Self {
42        JobError::Terminal(msg.into())
43    }
44}
45
46/// Worker trait — implement this for each job type.
47#[async_trait::async_trait]
48pub trait Worker: Send + Sync + 'static {
49    /// The kind string for this worker (must match the job's kind).
50    fn kind(&self) -> &'static str;
51
52    /// Execute the job. The raw args JSON and context are provided.
53    async fn perform(&self, job_row: &JobRow, ctx: &JobContext) -> Result<JobResult, JobError>;
54}
55
56/// Type-erased worker wrapper for the registry.
57pub(crate) type BoxedWorker = Box<dyn Worker>;
58
59/// Manages job execution — spawns worker futures and tracks in-flight jobs.
60pub struct JobExecutor {
61    pool: PgPool,
62    workers: Arc<HashMap<String, BoxedWorker>>,
63    in_flight: Arc<RwLock<HashMap<i64, Arc<AtomicBool>>>>,
64    queue_in_flight: Arc<HashMap<String, Arc<AtomicU32>>>,
65    state: Arc<HashMap<std::any::TypeId, Box<dyn Any + Send + Sync>>>,
66    metrics: crate::metrics::AwaMetrics,
67}
68
69impl JobExecutor {
70    pub fn new(
71        pool: PgPool,
72        workers: Arc<HashMap<String, BoxedWorker>>,
73        in_flight: Arc<RwLock<HashMap<i64, Arc<AtomicBool>>>>,
74        queue_in_flight: Arc<HashMap<String, Arc<AtomicU32>>>,
75        state: Arc<HashMap<std::any::TypeId, Box<dyn Any + Send + Sync>>>,
76        metrics: crate::metrics::AwaMetrics,
77    ) -> Self {
78        Self {
79            pool,
80            workers,
81            in_flight,
82            queue_in_flight,
83            state,
84            metrics,
85        }
86    }
87
88    /// Execute a claimed job. Returns a JoinHandle for the spawned task.
89    pub fn execute(&self, job: JobRow, cancel: Arc<AtomicBool>) -> tokio::task::JoinHandle<()> {
90        let pool = self.pool.clone();
91        let workers = self.workers.clone();
92        let in_flight = self.in_flight.clone();
93        let queue_in_flight = self.queue_in_flight.clone();
94        let state = self.state.clone();
95        let metrics = self.metrics.clone();
96        let job_id = job.id;
97        let job_kind = job.kind.clone();
98        let job_queue = job.queue.clone();
99
100        let span = info_span!(
101            "job.execute",
102            job.id = job_id,
103            job.kind = %job_kind,
104            job.queue = %job_queue,
105            job.attempt = job.attempt,
106            otel.name = %format!("job.execute {}", job_kind),
107            otel.status_code = tracing::field::Empty,
108        );
109
110        tokio::spawn(
111            async move {
112                // Register as in-flight
113                {
114                    let mut guard = in_flight.write().await;
115                    guard.insert(job_id, cancel.clone());
116                }
117                if let Some(counter) = queue_in_flight.get(&job_queue) {
118                    counter.fetch_add(1, Ordering::SeqCst);
119                }
120                metrics.record_in_flight_change(&job_queue, 1);
121
122                let start = std::time::Instant::now();
123                let ctx = JobContext::new(job.clone(), cancel, state);
124
125                let result = match workers.get(&job.kind) {
126                    Some(worker) => worker.perform(&job, &ctx).await,
127                    None => {
128                        error!(kind = %job.kind, job_id, "No worker registered for job kind");
129                        Err(JobError::Terminal(format!(
130                            "unknown job kind: {}",
131                            job.kind
132                        )))
133                    }
134                };
135
136                let duration = start.elapsed();
137
138                // Complete the job based on the result, then record metrics
139                // only if the state transition actually happened (not stale).
140                match complete_job(&pool, &job, &result).await {
141                    Ok(true) => {
142                        // State transition succeeded — record metrics
143                        match &result {
144                            Ok(JobResult::Completed) => {
145                                metrics.record_job_completed(&job_kind, &job_queue, duration);
146                            }
147                            Ok(JobResult::RetryAfter(_)) => {
148                                metrics.record_job_retried(&job_kind, &job_queue);
149                            }
150                            Ok(JobResult::Cancel(_)) => {
151                                metrics.jobs_cancelled.add(
152                                    1,
153                                    &[
154                                        opentelemetry::KeyValue::new(
155                                            "awa.job.kind",
156                                            job_kind.clone(),
157                                        ),
158                                        opentelemetry::KeyValue::new(
159                                            "awa.job.queue",
160                                            job_queue.clone(),
161                                        ),
162                                    ],
163                                );
164                            }
165                            Ok(JobResult::Snooze(_)) => {} // Not a terminal outcome
166                            Err(JobError::Terminal(_)) => {
167                                metrics.record_job_failed(&job_kind, &job_queue, true);
168                            }
169                            Err(JobError::Retryable(_)) => {
170                                metrics.record_job_retried(&job_kind, &job_queue);
171                            }
172                        }
173                    }
174                    Ok(false) => {
175                        // Job was already rescued/cancelled — no metrics
176                    }
177                    Err(err) => {
178                        error!(job_id, error = %err, "Failed to complete job");
179                    }
180                }
181
182                // Remove from in-flight
183                {
184                    let mut guard = in_flight.write().await;
185                    guard.remove(&job_id);
186                }
187                if let Some(counter) = queue_in_flight.get(&job_queue) {
188                    counter.fetch_sub(1, Ordering::SeqCst);
189                }
190                metrics.record_in_flight_change(&job_queue, -1);
191            }
192            .instrument(span),
193        )
194    }
195}
196
197/// Update job state in the database based on handler result.
198///
199/// Returns `true` if the state transition happened, `false` if the job was
200/// already rescued/cancelled by maintenance (stale completion).
201async fn complete_job(
202    pool: &PgPool,
203    job: &JobRow,
204    result: &Result<JobResult, JobError>,
205) -> Result<bool, AwaError> {
206    match result {
207        Ok(JobResult::Completed) => {
208            tracing::Span::current().record("otel.status_code", "OK");
209            info!(job_id = job.id, kind = %job.kind, attempt = job.attempt, "Job completed");
210            let result = sqlx::query(
211                "UPDATE awa.jobs SET state = 'completed', finalized_at = now() WHERE id = $1 AND state = 'running'",
212            )
213            .bind(job.id)
214            .execute(pool)
215            .await?;
216            if result.rows_affected() == 0 {
217                warn!(
218                    job_id = job.id,
219                    "Job already rescued/cancelled, completion ignored"
220                );
221                return Ok(false);
222            }
223        }
224
225        Ok(JobResult::RetryAfter(duration)) => {
226            let seconds = duration.as_secs() as f64;
227            info!(
228                job_id = job.id,
229                kind = %job.kind,
230                retry_after_secs = seconds,
231                "Job requested retry after duration"
232            );
233            let result = sqlx::query(
234                r#"
235                UPDATE awa.jobs
236                SET state = 'retryable',
237                    run_at = now() + make_interval(secs => $2),
238                    finalized_at = now()
239                WHERE id = $1 AND state = 'running'
240                "#,
241            )
242            .bind(job.id)
243            .bind(seconds)
244            .execute(pool)
245            .await?;
246            if result.rows_affected() == 0 {
247                warn!(
248                    job_id = job.id,
249                    "Job already rescued/cancelled, retry ignored"
250                );
251                return Ok(false);
252            }
253        }
254
255        Ok(JobResult::Snooze(duration)) => {
256            let seconds = duration.as_secs() as f64;
257            info!(
258                job_id = job.id,
259                kind = %job.kind,
260                snooze_secs = seconds,
261                "Job snoozed (attempt not incremented)"
262            );
263            // Snooze: back to available with new run_at, decrement attempt
264            // (since it was already incremented at claim time)
265            let result = sqlx::query(
266                r#"
267                UPDATE awa.jobs
268                SET state = 'scheduled',
269                    run_at = now() + make_interval(secs => $2),
270                    attempt = attempt - 1,
271                    heartbeat_at = NULL,
272                    deadline_at = NULL
273                WHERE id = $1 AND state = 'running'
274                "#,
275            )
276            .bind(job.id)
277            .bind(seconds)
278            .execute(pool)
279            .await?;
280            if result.rows_affected() == 0 {
281                warn!(
282                    job_id = job.id,
283                    "Job already rescued/cancelled, snooze ignored"
284                );
285                return Ok(false);
286            }
287        }
288
289        Ok(JobResult::Cancel(reason)) => {
290            info!(
291                job_id = job.id,
292                kind = %job.kind,
293                reason = %reason,
294                "Job cancelled by handler"
295            );
296            let result = sqlx::query(
297                r#"
298                UPDATE awa.jobs
299                SET state = 'cancelled',
300                    finalized_at = now(),
301                    errors = errors || $2::jsonb
302                WHERE id = $1 AND state = 'running'
303                "#,
304            )
305            .bind(job.id)
306            .bind(serde_json::json!({
307                "error": format!("cancelled: {}", reason),
308                "attempt": job.attempt,
309                "at": chrono::Utc::now().to_rfc3339()
310            }))
311            .execute(pool)
312            .await?;
313            if result.rows_affected() == 0 {
314                warn!(
315                    job_id = job.id,
316                    "Job already rescued/cancelled, cancel ignored"
317                );
318                return Ok(false);
319            }
320        }
321
322        Err(JobError::Terminal(msg)) => {
323            tracing::Span::current().record("otel.status_code", "ERROR");
324            error!(
325                job_id = job.id,
326                kind = %job.kind,
327                error = %msg,
328                "Job failed terminally"
329            );
330            let result = sqlx::query(
331                r#"
332                UPDATE awa.jobs
333                SET state = 'failed',
334                    finalized_at = now(),
335                    errors = errors || $2::jsonb
336                WHERE id = $1 AND state = 'running'
337                "#,
338            )
339            .bind(job.id)
340            .bind(serde_json::json!({
341                "error": msg.to_string(),
342                "attempt": job.attempt,
343                "at": chrono::Utc::now().to_rfc3339(),
344                "terminal": true
345            }))
346            .execute(pool)
347            .await?;
348            if result.rows_affected() == 0 {
349                warn!(
350                    job_id = job.id,
351                    "Job already rescued/cancelled, terminal failure ignored"
352                );
353                return Ok(false);
354            }
355        }
356
357        Err(JobError::Retryable(err)) => {
358            let error_msg = err.to_string();
359            if job.attempt >= job.max_attempts {
360                tracing::Span::current().record("otel.status_code", "ERROR");
361                error!(
362                    job_id = job.id,
363                    kind = %job.kind,
364                    attempt = job.attempt,
365                    max_attempts = job.max_attempts,
366                    error = %error_msg,
367                    "Job failed (max attempts exhausted)"
368                );
369                let result = sqlx::query(
370                    r#"
371                    UPDATE awa.jobs
372                    SET state = 'failed',
373                        finalized_at = now(),
374                        errors = errors || $2::jsonb
375                    WHERE id = $1 AND state = 'running'
376                    "#,
377                )
378                .bind(job.id)
379                .bind(serde_json::json!({
380                    "error": error_msg,
381                    "attempt": job.attempt,
382                    "at": chrono::Utc::now().to_rfc3339()
383                }))
384                .execute(pool)
385                .await?;
386                if result.rows_affected() == 0 {
387                    warn!(
388                        job_id = job.id,
389                        "Job already rescued/cancelled, failure ignored"
390                    );
391                    return Ok(false);
392                }
393            } else {
394                warn!(
395                    job_id = job.id,
396                    kind = %job.kind,
397                    attempt = job.attempt,
398                    error = %error_msg,
399                    "Job failed (will retry)"
400                );
401                // Use database-side backoff calculation
402                let result = sqlx::query(
403                    r#"
404                    UPDATE awa.jobs
405                    SET state = 'retryable',
406                        run_at = now() + awa.backoff_duration($2, $3),
407                        finalized_at = now(),
408                        heartbeat_at = NULL,
409                        deadline_at = NULL,
410                        errors = errors || $4::jsonb
411                    WHERE id = $1 AND state = 'running'
412                    "#,
413                )
414                .bind(job.id)
415                .bind(job.attempt)
416                .bind(job.max_attempts)
417                .bind(serde_json::json!({
418                    "error": error_msg,
419                    "attempt": job.attempt,
420                    "at": chrono::Utc::now().to_rfc3339()
421                }))
422                .execute(pool)
423                .await?;
424                if result.rows_affected() == 0 {
425                    warn!(
426                        job_id = job.id,
427                        "Job already rescued/cancelled, retry ignored"
428                    );
429                    return Ok(false);
430                }
431            }
432        }
433    }
434
435    Ok(true)
436}