1use crate::context::JobContext;
2use awa_model::{AwaError, JobRow};
3use sqlx::PgPool;
4use std::any::Any;
5use std::collections::HashMap;
6use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
7use std::sync::Arc;
8use tokio::sync::RwLock;
9use tracing::{error, info, info_span, warn, Instrument};
10
11#[derive(Debug)]
13pub enum JobResult {
14 Completed,
16 RetryAfter(std::time::Duration),
18 Snooze(std::time::Duration),
20 Cancel(String),
22}
23
24#[derive(Debug, thiserror::Error)]
26pub enum JobError {
27 #[error("{0}")]
29 Retryable(#[source] Box<dyn std::error::Error + Send + Sync>),
30
31 #[error("terminal: {0}")]
33 Terminal(String),
34}
35
36impl JobError {
37 pub fn retryable(err: impl std::error::Error + Send + Sync + 'static) -> Self {
38 JobError::Retryable(Box::new(err))
39 }
40
41 pub fn terminal(msg: impl Into<String>) -> Self {
42 JobError::Terminal(msg.into())
43 }
44}
45
46#[async_trait::async_trait]
48pub trait Worker: Send + Sync + 'static {
49 fn kind(&self) -> &'static str;
51
52 async fn perform(&self, job_row: &JobRow, ctx: &JobContext) -> Result<JobResult, JobError>;
54}
55
56pub(crate) type BoxedWorker = Box<dyn Worker>;
58
59pub struct JobExecutor {
61 pool: PgPool,
62 workers: Arc<HashMap<String, BoxedWorker>>,
63 in_flight: Arc<RwLock<HashMap<i64, Arc<AtomicBool>>>>,
64 queue_in_flight: Arc<HashMap<String, Arc<AtomicU32>>>,
65 state: Arc<HashMap<std::any::TypeId, Box<dyn Any + Send + Sync>>>,
66 metrics: crate::metrics::AwaMetrics,
67}
68
69impl JobExecutor {
70 pub fn new(
71 pool: PgPool,
72 workers: Arc<HashMap<String, BoxedWorker>>,
73 in_flight: Arc<RwLock<HashMap<i64, Arc<AtomicBool>>>>,
74 queue_in_flight: Arc<HashMap<String, Arc<AtomicU32>>>,
75 state: Arc<HashMap<std::any::TypeId, Box<dyn Any + Send + Sync>>>,
76 metrics: crate::metrics::AwaMetrics,
77 ) -> Self {
78 Self {
79 pool,
80 workers,
81 in_flight,
82 queue_in_flight,
83 state,
84 metrics,
85 }
86 }
87
88 pub fn execute(&self, job: JobRow, cancel: Arc<AtomicBool>) -> tokio::task::JoinHandle<()> {
90 let pool = self.pool.clone();
91 let workers = self.workers.clone();
92 let in_flight = self.in_flight.clone();
93 let queue_in_flight = self.queue_in_flight.clone();
94 let state = self.state.clone();
95 let metrics = self.metrics.clone();
96 let job_id = job.id;
97 let job_kind = job.kind.clone();
98 let job_queue = job.queue.clone();
99
100 let span = info_span!(
101 "job.execute",
102 job.id = job_id,
103 job.kind = %job_kind,
104 job.queue = %job_queue,
105 job.attempt = job.attempt,
106 otel.name = %format!("job.execute {}", job_kind),
107 otel.status_code = tracing::field::Empty,
108 );
109
110 tokio::spawn(
111 async move {
112 {
114 let mut guard = in_flight.write().await;
115 guard.insert(job_id, cancel.clone());
116 }
117 if let Some(counter) = queue_in_flight.get(&job_queue) {
118 counter.fetch_add(1, Ordering::SeqCst);
119 }
120 metrics.record_in_flight_change(&job_queue, 1);
121
122 let start = std::time::Instant::now();
123 let ctx = JobContext::new(job.clone(), cancel, state);
124
125 let result = match workers.get(&job.kind) {
126 Some(worker) => worker.perform(&job, &ctx).await,
127 None => {
128 error!(kind = %job.kind, job_id, "No worker registered for job kind");
129 Err(JobError::Terminal(format!(
130 "unknown job kind: {}",
131 job.kind
132 )))
133 }
134 };
135
136 let duration = start.elapsed();
137
138 match complete_job(&pool, &job, &result).await {
141 Ok(true) => {
142 match &result {
144 Ok(JobResult::Completed) => {
145 metrics.record_job_completed(&job_kind, &job_queue, duration);
146 }
147 Ok(JobResult::RetryAfter(_)) => {
148 metrics.record_job_retried(&job_kind, &job_queue);
149 }
150 Ok(JobResult::Cancel(_)) => {
151 metrics.jobs_cancelled.add(
152 1,
153 &[
154 opentelemetry::KeyValue::new(
155 "awa.job.kind",
156 job_kind.clone(),
157 ),
158 opentelemetry::KeyValue::new(
159 "awa.job.queue",
160 job_queue.clone(),
161 ),
162 ],
163 );
164 }
165 Ok(JobResult::Snooze(_)) => {} Err(JobError::Terminal(_)) => {
167 metrics.record_job_failed(&job_kind, &job_queue, true);
168 }
169 Err(JobError::Retryable(_)) => {
170 metrics.record_job_retried(&job_kind, &job_queue);
171 }
172 }
173 }
174 Ok(false) => {
175 }
177 Err(err) => {
178 error!(job_id, error = %err, "Failed to complete job");
179 }
180 }
181
182 {
184 let mut guard = in_flight.write().await;
185 guard.remove(&job_id);
186 }
187 if let Some(counter) = queue_in_flight.get(&job_queue) {
188 counter.fetch_sub(1, Ordering::SeqCst);
189 }
190 metrics.record_in_flight_change(&job_queue, -1);
191 }
192 .instrument(span),
193 )
194 }
195}
196
197async fn complete_job(
202 pool: &PgPool,
203 job: &JobRow,
204 result: &Result<JobResult, JobError>,
205) -> Result<bool, AwaError> {
206 match result {
207 Ok(JobResult::Completed) => {
208 tracing::Span::current().record("otel.status_code", "OK");
209 info!(job_id = job.id, kind = %job.kind, attempt = job.attempt, "Job completed");
210 let result = sqlx::query(
211 "UPDATE awa.jobs SET state = 'completed', finalized_at = now() WHERE id = $1 AND state = 'running'",
212 )
213 .bind(job.id)
214 .execute(pool)
215 .await?;
216 if result.rows_affected() == 0 {
217 warn!(
218 job_id = job.id,
219 "Job already rescued/cancelled, completion ignored"
220 );
221 return Ok(false);
222 }
223 }
224
225 Ok(JobResult::RetryAfter(duration)) => {
226 let seconds = duration.as_secs() as f64;
227 info!(
228 job_id = job.id,
229 kind = %job.kind,
230 retry_after_secs = seconds,
231 "Job requested retry after duration"
232 );
233 let result = sqlx::query(
234 r#"
235 UPDATE awa.jobs
236 SET state = 'retryable',
237 run_at = now() + make_interval(secs => $2),
238 finalized_at = now()
239 WHERE id = $1 AND state = 'running'
240 "#,
241 )
242 .bind(job.id)
243 .bind(seconds)
244 .execute(pool)
245 .await?;
246 if result.rows_affected() == 0 {
247 warn!(
248 job_id = job.id,
249 "Job already rescued/cancelled, retry ignored"
250 );
251 return Ok(false);
252 }
253 }
254
255 Ok(JobResult::Snooze(duration)) => {
256 let seconds = duration.as_secs() as f64;
257 info!(
258 job_id = job.id,
259 kind = %job.kind,
260 snooze_secs = seconds,
261 "Job snoozed (attempt not incremented)"
262 );
263 let result = sqlx::query(
266 r#"
267 UPDATE awa.jobs
268 SET state = 'scheduled',
269 run_at = now() + make_interval(secs => $2),
270 attempt = attempt - 1,
271 heartbeat_at = NULL,
272 deadline_at = NULL
273 WHERE id = $1 AND state = 'running'
274 "#,
275 )
276 .bind(job.id)
277 .bind(seconds)
278 .execute(pool)
279 .await?;
280 if result.rows_affected() == 0 {
281 warn!(
282 job_id = job.id,
283 "Job already rescued/cancelled, snooze ignored"
284 );
285 return Ok(false);
286 }
287 }
288
289 Ok(JobResult::Cancel(reason)) => {
290 info!(
291 job_id = job.id,
292 kind = %job.kind,
293 reason = %reason,
294 "Job cancelled by handler"
295 );
296 let result = sqlx::query(
297 r#"
298 UPDATE awa.jobs
299 SET state = 'cancelled',
300 finalized_at = now(),
301 errors = errors || $2::jsonb
302 WHERE id = $1 AND state = 'running'
303 "#,
304 )
305 .bind(job.id)
306 .bind(serde_json::json!({
307 "error": format!("cancelled: {}", reason),
308 "attempt": job.attempt,
309 "at": chrono::Utc::now().to_rfc3339()
310 }))
311 .execute(pool)
312 .await?;
313 if result.rows_affected() == 0 {
314 warn!(
315 job_id = job.id,
316 "Job already rescued/cancelled, cancel ignored"
317 );
318 return Ok(false);
319 }
320 }
321
322 Err(JobError::Terminal(msg)) => {
323 tracing::Span::current().record("otel.status_code", "ERROR");
324 error!(
325 job_id = job.id,
326 kind = %job.kind,
327 error = %msg,
328 "Job failed terminally"
329 );
330 let result = sqlx::query(
331 r#"
332 UPDATE awa.jobs
333 SET state = 'failed',
334 finalized_at = now(),
335 errors = errors || $2::jsonb
336 WHERE id = $1 AND state = 'running'
337 "#,
338 )
339 .bind(job.id)
340 .bind(serde_json::json!({
341 "error": msg.to_string(),
342 "attempt": job.attempt,
343 "at": chrono::Utc::now().to_rfc3339(),
344 "terminal": true
345 }))
346 .execute(pool)
347 .await?;
348 if result.rows_affected() == 0 {
349 warn!(
350 job_id = job.id,
351 "Job already rescued/cancelled, terminal failure ignored"
352 );
353 return Ok(false);
354 }
355 }
356
357 Err(JobError::Retryable(err)) => {
358 let error_msg = err.to_string();
359 if job.attempt >= job.max_attempts {
360 tracing::Span::current().record("otel.status_code", "ERROR");
361 error!(
362 job_id = job.id,
363 kind = %job.kind,
364 attempt = job.attempt,
365 max_attempts = job.max_attempts,
366 error = %error_msg,
367 "Job failed (max attempts exhausted)"
368 );
369 let result = sqlx::query(
370 r#"
371 UPDATE awa.jobs
372 SET state = 'failed',
373 finalized_at = now(),
374 errors = errors || $2::jsonb
375 WHERE id = $1 AND state = 'running'
376 "#,
377 )
378 .bind(job.id)
379 .bind(serde_json::json!({
380 "error": error_msg,
381 "attempt": job.attempt,
382 "at": chrono::Utc::now().to_rfc3339()
383 }))
384 .execute(pool)
385 .await?;
386 if result.rows_affected() == 0 {
387 warn!(
388 job_id = job.id,
389 "Job already rescued/cancelled, failure ignored"
390 );
391 return Ok(false);
392 }
393 } else {
394 warn!(
395 job_id = job.id,
396 kind = %job.kind,
397 attempt = job.attempt,
398 error = %error_msg,
399 "Job failed (will retry)"
400 );
401 let result = sqlx::query(
403 r#"
404 UPDATE awa.jobs
405 SET state = 'retryable',
406 run_at = now() + awa.backoff_duration($2, $3),
407 finalized_at = now(),
408 heartbeat_at = NULL,
409 deadline_at = NULL,
410 errors = errors || $4::jsonb
411 WHERE id = $1 AND state = 'running'
412 "#,
413 )
414 .bind(job.id)
415 .bind(job.attempt)
416 .bind(job.max_attempts)
417 .bind(serde_json::json!({
418 "error": error_msg,
419 "attempt": job.attempt,
420 "at": chrono::Utc::now().to_rfc3339()
421 }))
422 .execute(pool)
423 .await?;
424 if result.rows_affected() == 0 {
425 warn!(
426 job_id = job.id,
427 "Job already rescued/cancelled, retry ignored"
428 );
429 return Ok(false);
430 }
431 }
432 }
433 }
434
435 Ok(true)
436}