1use crate::error::AwaError;
2use crate::job::{JobRow, JobState};
3use chrono::{DateTime, Duration, Utc};
4use serde::{Deserialize, Serialize};
5use sqlx::types::Json;
6use sqlx::PgExecutor;
7use sqlx::PgPool;
8use std::cmp::max;
9use std::collections::HashMap;
10use uuid::Uuid;
11
12pub async fn retry<'e, E>(executor: E, job_id: i64) -> Result<Option<JobRow>, AwaError>
14where
15 E: PgExecutor<'e>,
16{
17 sqlx::query_as::<_, JobRow>(
18 r#"
19 UPDATE awa.jobs
20 SET state = 'available', attempt = 0, run_at = now(),
21 finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL,
22 callback_id = NULL, callback_timeout_at = NULL,
23 callback_filter = NULL, callback_on_complete = NULL,
24 callback_on_fail = NULL, callback_transform = NULL
25 WHERE id = $1 AND state IN ('failed', 'cancelled', 'waiting_external')
26 RETURNING *
27 "#,
28 )
29 .bind(job_id)
30 .fetch_optional(executor)
31 .await?
32 .ok_or(AwaError::JobNotFound { id: job_id })
33 .map(Some)
34}
35
36pub async fn cancel<'e, E>(executor: E, job_id: i64) -> Result<Option<JobRow>, AwaError>
38where
39 E: PgExecutor<'e>,
40{
41 sqlx::query_as::<_, JobRow>(
42 r#"
43 UPDATE awa.jobs
44 SET state = 'cancelled', finalized_at = now(),
45 callback_id = NULL, callback_timeout_at = NULL,
46 callback_filter = NULL, callback_on_complete = NULL,
47 callback_on_fail = NULL, callback_transform = NULL
48 WHERE id = $1 AND state NOT IN ('completed', 'failed', 'cancelled')
49 RETURNING *
50 "#,
51 )
52 .bind(job_id)
53 .fetch_optional(executor)
54 .await?
55 .ok_or(AwaError::JobNotFound { id: job_id })
56 .map(Some)
57}
58
59pub async fn cancel_by_unique_key<'e, E>(
95 executor: E,
96 kind: &str,
97 queue: Option<&str>,
98 args: Option<&serde_json::Value>,
99 period_bucket: Option<i64>,
100) -> Result<Option<JobRow>, AwaError>
101where
102 E: PgExecutor<'e>,
103{
104 let unique_key = crate::unique::compute_unique_key(kind, queue, args, period_bucket);
105
106 let row = sqlx::query_as::<_, JobRow>(
110 r#"
111 WITH candidates AS (
112 SELECT id FROM awa.jobs_hot
113 WHERE unique_key = $1 AND state NOT IN ('completed', 'failed', 'cancelled')
114 UNION ALL
115 SELECT id FROM awa.scheduled_jobs
116 WHERE unique_key = $1 AND state NOT IN ('completed', 'failed', 'cancelled')
117 ORDER BY id ASC
118 LIMIT 1
119 )
120 UPDATE awa.jobs
121 SET state = 'cancelled', finalized_at = now(),
122 callback_id = NULL, callback_timeout_at = NULL,
123 callback_filter = NULL, callback_on_complete = NULL,
124 callback_on_fail = NULL, callback_transform = NULL
125 FROM candidates
126 WHERE awa.jobs.id = candidates.id
127 RETURNING awa.jobs.*
128 "#,
129 )
130 .bind(&unique_key)
131 .fetch_optional(executor)
132 .await?;
133
134 Ok(row)
135}
136
137pub async fn retry_failed_by_kind<'e, E>(executor: E, kind: &str) -> Result<Vec<JobRow>, AwaError>
139where
140 E: PgExecutor<'e>,
141{
142 let rows = sqlx::query_as::<_, JobRow>(
143 r#"
144 UPDATE awa.jobs
145 SET state = 'available', attempt = 0, run_at = now(),
146 finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL
147 WHERE kind = $1 AND state = 'failed'
148 RETURNING *
149 "#,
150 )
151 .bind(kind)
152 .fetch_all(executor)
153 .await?;
154
155 Ok(rows)
156}
157
158pub async fn retry_failed_by_queue<'e, E>(executor: E, queue: &str) -> Result<Vec<JobRow>, AwaError>
160where
161 E: PgExecutor<'e>,
162{
163 let rows = sqlx::query_as::<_, JobRow>(
164 r#"
165 UPDATE awa.jobs
166 SET state = 'available', attempt = 0, run_at = now(),
167 finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL
168 WHERE queue = $1 AND state = 'failed'
169 RETURNING *
170 "#,
171 )
172 .bind(queue)
173 .fetch_all(executor)
174 .await?;
175
176 Ok(rows)
177}
178
179pub async fn discard_failed<'e, E>(executor: E, kind: &str) -> Result<u64, AwaError>
181where
182 E: PgExecutor<'e>,
183{
184 let result = sqlx::query("DELETE FROM awa.jobs WHERE kind = $1 AND state = 'failed'")
185 .bind(kind)
186 .execute(executor)
187 .await?;
188
189 Ok(result.rows_affected())
190}
191
192pub async fn pause_queue<'e, E>(
194 executor: E,
195 queue: &str,
196 paused_by: Option<&str>,
197) -> Result<(), AwaError>
198where
199 E: PgExecutor<'e>,
200{
201 sqlx::query(
202 r#"
203 INSERT INTO awa.queue_meta (queue, paused, paused_at, paused_by)
204 VALUES ($1, TRUE, now(), $2)
205 ON CONFLICT (queue) DO UPDATE SET paused = TRUE, paused_at = now(), paused_by = $2
206 "#,
207 )
208 .bind(queue)
209 .bind(paused_by)
210 .execute(executor)
211 .await?;
212
213 Ok(())
214}
215
216pub async fn resume_queue<'e, E>(executor: E, queue: &str) -> Result<(), AwaError>
218where
219 E: PgExecutor<'e>,
220{
221 sqlx::query("UPDATE awa.queue_meta SET paused = FALSE WHERE queue = $1")
222 .bind(queue)
223 .execute(executor)
224 .await?;
225
226 Ok(())
227}
228
229pub async fn drain_queue<'e, E>(executor: E, queue: &str) -> Result<u64, AwaError>
231where
232 E: PgExecutor<'e>,
233{
234 let result = sqlx::query(
235 r#"
236 UPDATE awa.jobs
237 SET state = 'cancelled', finalized_at = now(),
238 callback_id = NULL, callback_timeout_at = NULL,
239 callback_filter = NULL, callback_on_complete = NULL,
240 callback_on_fail = NULL, callback_transform = NULL
241 WHERE queue = $1 AND state IN ('available', 'scheduled', 'retryable', 'waiting_external')
242 "#,
243 )
244 .bind(queue)
245 .execute(executor)
246 .await?;
247
248 Ok(result.rows_affected())
249}
250
251#[derive(Debug, Clone, Serialize)]
253pub struct QueueStats {
254 pub queue: String,
255 pub total_queued: i64,
257 pub scheduled: i64,
258 pub available: i64,
259 pub retryable: i64,
260 pub running: i64,
261 pub failed: i64,
262 pub waiting_external: i64,
263 pub completed_last_hour: i64,
264 pub lag_seconds: Option<f64>,
265 pub paused: bool,
266}
267
268#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
270pub struct RateLimitSnapshot {
271 pub max_rate: f64,
272 pub burst: u32,
273}
274
275#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
277#[serde(rename_all = "snake_case")]
278pub enum QueueRuntimeMode {
279 HardReserved,
280 Weighted,
281}
282
283#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
285pub struct QueueRuntimeConfigSnapshot {
286 pub mode: QueueRuntimeMode,
287 pub max_workers: Option<u32>,
288 pub min_workers: Option<u32>,
289 pub weight: Option<u32>,
290 pub global_max_workers: Option<u32>,
291 pub poll_interval_ms: u64,
292 pub deadline_duration_secs: u64,
293 pub priority_aging_interval_secs: u64,
294 pub rate_limit: Option<RateLimitSnapshot>,
295}
296
297#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
299pub struct QueueRuntimeSnapshot {
300 pub queue: String,
301 pub in_flight: u32,
302 pub overflow_held: Option<u32>,
303 pub config: QueueRuntimeConfigSnapshot,
304}
305
306#[derive(Debug, Clone, Serialize, Deserialize)]
308pub struct RuntimeSnapshotInput {
309 pub instance_id: Uuid,
310 pub hostname: Option<String>,
311 pub pid: i32,
312 pub version: String,
313 pub started_at: DateTime<Utc>,
314 pub snapshot_interval_ms: i64,
315 pub healthy: bool,
316 pub postgres_connected: bool,
317 pub poll_loop_alive: bool,
318 pub heartbeat_alive: bool,
319 pub maintenance_alive: bool,
320 pub shutting_down: bool,
321 pub leader: bool,
322 pub global_max_workers: Option<u32>,
323 pub queues: Vec<QueueRuntimeSnapshot>,
324}
325
326#[derive(Debug, Clone, Serialize, Deserialize)]
328pub struct RuntimeInstance {
329 pub instance_id: Uuid,
330 pub hostname: Option<String>,
331 pub pid: i32,
332 pub version: String,
333 pub started_at: DateTime<Utc>,
334 pub last_seen_at: DateTime<Utc>,
335 pub snapshot_interval_ms: i64,
336 pub stale: bool,
337 pub healthy: bool,
338 pub postgres_connected: bool,
339 pub poll_loop_alive: bool,
340 pub heartbeat_alive: bool,
341 pub maintenance_alive: bool,
342 pub shutting_down: bool,
343 pub leader: bool,
344 pub global_max_workers: Option<u32>,
345 pub queues: Vec<QueueRuntimeSnapshot>,
346}
347
348impl RuntimeInstance {
349 fn stale_cutoff(interval_ms: i64) -> Duration {
350 let interval_ms = max(interval_ms, 1_000);
351 Duration::milliseconds(max(interval_ms.saturating_mul(3), 30_000))
352 }
353
354 fn from_db_row(row: RuntimeInstanceRow, now: DateTime<Utc>) -> Self {
355 let stale = row.last_seen_at + Self::stale_cutoff(row.snapshot_interval_ms) < now;
356 Self {
357 instance_id: row.instance_id,
358 hostname: row.hostname,
359 pid: row.pid,
360 version: row.version,
361 started_at: row.started_at,
362 last_seen_at: row.last_seen_at,
363 snapshot_interval_ms: row.snapshot_interval_ms,
364 stale,
365 healthy: row.healthy,
366 postgres_connected: row.postgres_connected,
367 poll_loop_alive: row.poll_loop_alive,
368 heartbeat_alive: row.heartbeat_alive,
369 maintenance_alive: row.maintenance_alive,
370 shutting_down: row.shutting_down,
371 leader: row.leader,
372 global_max_workers: row.global_max_workers.map(|v| v as u32),
373 queues: row.queues.0,
374 }
375 }
376}
377
378#[derive(Debug, Clone, Serialize, Deserialize)]
380pub struct RuntimeOverview {
381 pub total_instances: usize,
382 pub live_instances: usize,
383 pub stale_instances: usize,
384 pub healthy_instances: usize,
385 pub leader_instances: usize,
386 pub instances: Vec<RuntimeInstance>,
387}
388
389#[derive(Debug, Clone, Serialize, Deserialize)]
391pub struct QueueRuntimeSummary {
392 pub queue: String,
393 pub instance_count: usize,
394 pub live_instances: usize,
395 pub stale_instances: usize,
396 pub healthy_instances: usize,
397 pub total_in_flight: u64,
398 pub overflow_held_total: Option<u64>,
399 pub config_mismatch: bool,
400 pub config: Option<QueueRuntimeConfigSnapshot>,
401}
402
403#[derive(Debug, sqlx::FromRow)]
404struct RuntimeInstanceRow {
405 instance_id: Uuid,
406 hostname: Option<String>,
407 pid: i32,
408 version: String,
409 started_at: DateTime<Utc>,
410 last_seen_at: DateTime<Utc>,
411 snapshot_interval_ms: i64,
412 healthy: bool,
413 postgres_connected: bool,
414 poll_loop_alive: bool,
415 heartbeat_alive: bool,
416 maintenance_alive: bool,
417 shutting_down: bool,
418 leader: bool,
419 global_max_workers: Option<i32>,
420 queues: Json<Vec<QueueRuntimeSnapshot>>,
421}
422
423pub async fn upsert_runtime_snapshot<'e, E>(
425 executor: E,
426 snapshot: &RuntimeSnapshotInput,
427) -> Result<(), AwaError>
428where
429 E: PgExecutor<'e>,
430{
431 sqlx::query(
432 r#"
433 INSERT INTO awa.runtime_instances (
434 instance_id,
435 hostname,
436 pid,
437 version,
438 started_at,
439 last_seen_at,
440 snapshot_interval_ms,
441 healthy,
442 postgres_connected,
443 poll_loop_alive,
444 heartbeat_alive,
445 maintenance_alive,
446 shutting_down,
447 leader,
448 global_max_workers,
449 queues
450 )
451 VALUES (
452 $1, $2, $3, $4, $5, now(), $6, $7, $8, $9, $10, $11, $12, $13, $14, $15
453 )
454 ON CONFLICT (instance_id) DO UPDATE SET
455 hostname = EXCLUDED.hostname,
456 pid = EXCLUDED.pid,
457 version = EXCLUDED.version,
458 started_at = EXCLUDED.started_at,
459 last_seen_at = now(),
460 snapshot_interval_ms = EXCLUDED.snapshot_interval_ms,
461 healthy = EXCLUDED.healthy,
462 postgres_connected = EXCLUDED.postgres_connected,
463 poll_loop_alive = EXCLUDED.poll_loop_alive,
464 heartbeat_alive = EXCLUDED.heartbeat_alive,
465 maintenance_alive = EXCLUDED.maintenance_alive,
466 shutting_down = EXCLUDED.shutting_down,
467 leader = EXCLUDED.leader,
468 global_max_workers = EXCLUDED.global_max_workers,
469 queues = EXCLUDED.queues
470 "#,
471 )
472 .bind(snapshot.instance_id)
473 .bind(snapshot.hostname.as_deref())
474 .bind(snapshot.pid)
475 .bind(&snapshot.version)
476 .bind(snapshot.started_at)
477 .bind(snapshot.snapshot_interval_ms)
478 .bind(snapshot.healthy)
479 .bind(snapshot.postgres_connected)
480 .bind(snapshot.poll_loop_alive)
481 .bind(snapshot.heartbeat_alive)
482 .bind(snapshot.maintenance_alive)
483 .bind(snapshot.shutting_down)
484 .bind(snapshot.leader)
485 .bind(snapshot.global_max_workers.map(|v| v as i32))
486 .bind(Json(&snapshot.queues))
487 .execute(executor)
488 .await?;
489
490 Ok(())
491}
492
493pub async fn cleanup_runtime_snapshots<'e, E>(
495 executor: E,
496 max_age: Duration,
497) -> Result<u64, AwaError>
498where
499 E: PgExecutor<'e>,
500{
501 let seconds = max(max_age.num_seconds(), 1);
502 let result = sqlx::query(
503 "DELETE FROM awa.runtime_instances WHERE last_seen_at < now() - make_interval(secs => $1)",
504 )
505 .bind(seconds)
506 .execute(executor)
507 .await?;
508
509 Ok(result.rows_affected())
510}
511
512pub async fn list_runtime_instances<'e, E>(executor: E) -> Result<Vec<RuntimeInstance>, AwaError>
514where
515 E: PgExecutor<'e>,
516{
517 let rows = sqlx::query_as::<_, RuntimeInstanceRow>(
518 r#"
519 SELECT
520 instance_id,
521 hostname,
522 pid,
523 version,
524 started_at,
525 last_seen_at,
526 snapshot_interval_ms,
527 healthy,
528 postgres_connected,
529 poll_loop_alive,
530 heartbeat_alive,
531 maintenance_alive,
532 shutting_down,
533 leader,
534 global_max_workers,
535 queues
536 FROM awa.runtime_instances
537 ORDER BY leader DESC, last_seen_at DESC, started_at DESC
538 "#,
539 )
540 .fetch_all(executor)
541 .await?;
542
543 let now = Utc::now();
544 Ok(rows
545 .into_iter()
546 .map(|row| RuntimeInstance::from_db_row(row, now))
547 .collect())
548}
549
550pub async fn runtime_overview<'e, E>(executor: E) -> Result<RuntimeOverview, AwaError>
552where
553 E: PgExecutor<'e>,
554{
555 let instances = list_runtime_instances(executor).await?;
556 let total_instances = instances.len();
557 let stale_instances = instances.iter().filter(|i| i.stale).count();
558 let live_instances = total_instances.saturating_sub(stale_instances);
559 let healthy_instances = instances.iter().filter(|i| !i.stale && i.healthy).count();
560 let leader_instances = instances.iter().filter(|i| !i.stale && i.leader).count();
561
562 Ok(RuntimeOverview {
563 total_instances,
564 live_instances,
565 stale_instances,
566 healthy_instances,
567 leader_instances,
568 instances,
569 })
570}
571
572pub async fn queue_runtime_summary<'e, E>(executor: E) -> Result<Vec<QueueRuntimeSummary>, AwaError>
574where
575 E: PgExecutor<'e>,
576{
577 let instances = list_runtime_instances(executor).await?;
578 let mut by_queue: HashMap<String, Vec<(bool, bool, QueueRuntimeSnapshot)>> = HashMap::new();
579
580 for instance in instances {
581 let is_live = !instance.stale;
582 let is_healthy = is_live && instance.healthy;
583 for queue in instance.queues {
584 by_queue
585 .entry(queue.queue.clone())
586 .or_default()
587 .push((is_live, is_healthy, queue));
588 }
589 }
590
591 let mut summaries: Vec<_> = by_queue
592 .into_iter()
593 .map(|(queue, entries)| {
594 let instance_count = entries.len();
595 let live_instances = entries.iter().filter(|(live, _, _)| *live).count();
596 let stale_instances = instance_count.saturating_sub(live_instances);
597 let healthy_instances = entries.iter().filter(|(_, healthy, _)| *healthy).count();
598 let total_in_flight = entries
599 .iter()
600 .filter(|(live, _, _)| *live)
601 .map(|(_, _, queue)| u64::from(queue.in_flight))
602 .sum();
603
604 let overflow_total: u64 = entries
605 .iter()
606 .filter(|(live, _, _)| *live)
607 .filter_map(|(_, _, queue)| queue.overflow_held.map(u64::from))
608 .sum();
609
610 let live_configs: Vec<_> = entries
611 .iter()
612 .filter(|(live, _, _)| *live)
613 .map(|(_, _, queue)| queue.config.clone())
614 .collect();
615 let config_candidates = if live_configs.is_empty() {
616 entries
617 .iter()
618 .map(|(_, _, queue)| queue.config.clone())
619 .collect::<Vec<_>>()
620 } else {
621 live_configs
622 };
623 let config = config_candidates.first().cloned();
624 let config_mismatch = config_candidates
625 .iter()
626 .skip(1)
627 .any(|candidate| Some(candidate) != config.as_ref());
628
629 QueueRuntimeSummary {
630 queue,
631 instance_count,
632 live_instances,
633 stale_instances,
634 healthy_instances,
635 total_in_flight,
636 overflow_held_total: config
637 .as_ref()
638 .filter(|cfg| cfg.mode == QueueRuntimeMode::Weighted)
639 .map(|_| overflow_total),
640 config_mismatch,
641 config,
642 }
643 })
644 .collect();
645
646 summaries.sort_by(|a, b| a.queue.cmp(&b.queue));
647 Ok(summaries)
648}
649
650pub async fn queue_stats<'e, E>(executor: E) -> Result<Vec<QueueStats>, AwaError>
652where
653 E: PgExecutor<'e>,
654{
655 let rows = sqlx::query_as::<
656 _,
657 (
658 String,
659 i64,
660 i64,
661 i64,
662 i64,
663 i64,
664 i64,
665 i64,
666 i64,
667 Option<f64>,
668 bool,
669 ),
670 >(
671 r#"
672 WITH available_lag AS (
673 SELECT
674 queue,
675 EXTRACT(EPOCH FROM (now() - min(run_at)))::float8 AS lag_seconds
676 FROM awa.jobs_hot
677 WHERE state = 'available'
678 GROUP BY queue
679 ),
680 completed_recent AS (
681 SELECT
682 queue,
683 count(*)::bigint AS completed_last_hour
684 FROM awa.jobs_hot
685 WHERE state = 'completed'
686 AND finalized_at > now() - interval '1 hour'
687 GROUP BY queue
688 )
689 SELECT
690 qs.queue,
691 qs.scheduled + qs.available + qs.running + qs.retryable + qs.waiting_external AS total_queued,
692 qs.scheduled,
693 qs.available,
694 qs.retryable,
695 qs.running,
696 qs.failed,
697 qs.waiting_external,
698 COALESCE(cr.completed_last_hour, 0) AS completed_last_hour,
699 al.lag_seconds,
700 COALESCE(qm.paused, FALSE) AS paused
701 FROM awa.queue_state_counts qs
702 LEFT JOIN available_lag al ON al.queue = qs.queue
703 LEFT JOIN completed_recent cr ON cr.queue = qs.queue
704 LEFT JOIN awa.queue_meta qm ON qm.queue = qs.queue
705 ORDER BY qs.queue
706 "#,
707 )
708 .fetch_all(executor)
709 .await?;
710
711 Ok(rows
712 .into_iter()
713 .map(
714 |(
715 queue,
716 total_queued,
717 scheduled,
718 available,
719 retryable,
720 running,
721 failed,
722 waiting_external,
723 completed_last_hour,
724 lag_seconds,
725 paused,
726 )| QueueStats {
727 queue,
728 total_queued,
729 scheduled,
730 available,
731 retryable,
732 running,
733 failed,
734 waiting_external,
735 completed_last_hour,
736 lag_seconds,
737 paused,
738 },
739 )
740 .collect())
741}
742
743#[derive(Debug, Clone, Default, Serialize)]
745pub struct ListJobsFilter {
746 pub state: Option<JobState>,
747 pub kind: Option<String>,
748 pub queue: Option<String>,
749 pub tag: Option<String>,
750 pub before_id: Option<i64>,
751 pub limit: Option<i64>,
752}
753
754pub async fn list_jobs<'e, E>(executor: E, filter: &ListJobsFilter) -> Result<Vec<JobRow>, AwaError>
756where
757 E: PgExecutor<'e>,
758{
759 let limit = filter.limit.unwrap_or(100);
760
761 let rows = sqlx::query_as::<_, JobRow>(
762 r#"
763 SELECT * FROM awa.jobs
764 WHERE ($1::awa.job_state IS NULL OR state = $1)
765 AND ($2::text IS NULL OR kind = $2)
766 AND ($3::text IS NULL OR queue = $3)
767 AND ($4::text IS NULL OR tags @> ARRAY[$4]::text[])
768 AND ($5::bigint IS NULL OR id < $5)
769 ORDER BY id DESC
770 LIMIT $6
771 "#,
772 )
773 .bind(filter.state)
774 .bind(&filter.kind)
775 .bind(&filter.queue)
776 .bind(&filter.tag)
777 .bind(filter.before_id)
778 .bind(limit)
779 .fetch_all(executor)
780 .await?;
781
782 Ok(rows)
783}
784
785pub async fn get_job<'e, E>(executor: E, job_id: i64) -> Result<JobRow, AwaError>
787where
788 E: PgExecutor<'e>,
789{
790 let row = sqlx::query_as::<_, JobRow>("SELECT * FROM awa.jobs WHERE id = $1")
791 .bind(job_id)
792 .fetch_optional(executor)
793 .await?;
794
795 row.ok_or(AwaError::JobNotFound { id: job_id })
796}
797
798pub async fn state_counts<'e, E>(executor: E) -> Result<HashMap<JobState, i64>, AwaError>
800where
801 E: PgExecutor<'e>,
802{
803 let rows = sqlx::query_as::<_, (JobState, i64)>(
804 r#"
805 SELECT 'scheduled'::awa.job_state, COALESCE(sum(scheduled), 0)::bigint FROM awa.queue_state_counts
806 UNION ALL
807 SELECT 'available'::awa.job_state, COALESCE(sum(available), 0)::bigint FROM awa.queue_state_counts
808 UNION ALL
809 SELECT 'running'::awa.job_state, COALESCE(sum(running), 0)::bigint FROM awa.queue_state_counts
810 UNION ALL
811 SELECT 'completed'::awa.job_state, COALESCE(sum(completed), 0)::bigint FROM awa.queue_state_counts
812 UNION ALL
813 SELECT 'retryable'::awa.job_state, COALESCE(sum(retryable), 0)::bigint FROM awa.queue_state_counts
814 UNION ALL
815 SELECT 'failed'::awa.job_state, COALESCE(sum(failed), 0)::bigint FROM awa.queue_state_counts
816 UNION ALL
817 SELECT 'cancelled'::awa.job_state, COALESCE(sum(cancelled), 0)::bigint FROM awa.queue_state_counts
818 UNION ALL
819 SELECT 'waiting_external'::awa.job_state, COALESCE(sum(waiting_external), 0)::bigint FROM awa.queue_state_counts
820 "#,
821 )
822 .fetch_all(executor)
823 .await?;
824
825 Ok(rows.into_iter().collect())
826}
827
828pub async fn distinct_kinds<'e, E>(executor: E) -> Result<Vec<String>, AwaError>
830where
831 E: PgExecutor<'e>,
832{
833 let rows = sqlx::query_scalar::<_, String>(
834 "SELECT kind FROM awa.job_kind_catalog WHERE ref_count > 0 ORDER BY kind",
835 )
836 .fetch_all(executor)
837 .await?;
838
839 Ok(rows)
840}
841
842pub async fn distinct_queues<'e, E>(executor: E) -> Result<Vec<String>, AwaError>
844where
845 E: PgExecutor<'e>,
846{
847 let rows = sqlx::query_scalar::<_, String>(
848 "SELECT queue FROM awa.job_queue_catalog WHERE ref_count > 0 ORDER BY queue",
849 )
850 .fetch_all(executor)
851 .await?;
852
853 Ok(rows)
854}
855
856pub async fn bulk_retry<'e, E>(executor: E, ids: &[i64]) -> Result<Vec<JobRow>, AwaError>
858where
859 E: PgExecutor<'e>,
860{
861 let rows = sqlx::query_as::<_, JobRow>(
862 r#"
863 UPDATE awa.jobs
864 SET state = 'available', attempt = 0, run_at = now(),
865 finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL,
866 callback_id = NULL, callback_timeout_at = NULL,
867 callback_filter = NULL, callback_on_complete = NULL,
868 callback_on_fail = NULL, callback_transform = NULL
869 WHERE id = ANY($1) AND state IN ('failed', 'cancelled', 'waiting_external')
870 RETURNING *
871 "#,
872 )
873 .bind(ids)
874 .fetch_all(executor)
875 .await?;
876
877 Ok(rows)
878}
879
880pub async fn bulk_cancel<'e, E>(executor: E, ids: &[i64]) -> Result<Vec<JobRow>, AwaError>
882where
883 E: PgExecutor<'e>,
884{
885 let rows = sqlx::query_as::<_, JobRow>(
886 r#"
887 UPDATE awa.jobs
888 SET state = 'cancelled', finalized_at = now(),
889 callback_id = NULL, callback_timeout_at = NULL,
890 callback_filter = NULL, callback_on_complete = NULL,
891 callback_on_fail = NULL, callback_transform = NULL
892 WHERE id = ANY($1) AND state NOT IN ('completed', 'failed', 'cancelled')
893 RETURNING *
894 "#,
895 )
896 .bind(ids)
897 .fetch_all(executor)
898 .await?;
899
900 Ok(rows)
901}
902
903#[derive(Debug, Clone, Serialize)]
905pub struct StateTimeseriesBucket {
906 pub bucket: chrono::DateTime<chrono::Utc>,
907 pub state: JobState,
908 pub count: i64,
909}
910
911pub async fn state_timeseries<'e, E>(
913 executor: E,
914 minutes: i32,
915) -> Result<Vec<StateTimeseriesBucket>, AwaError>
916where
917 E: PgExecutor<'e>,
918{
919 let rows = sqlx::query_as::<_, (chrono::DateTime<chrono::Utc>, JobState, i64)>(
920 r#"
921 SELECT
922 date_trunc('minute', created_at) AS bucket,
923 state,
924 count(*) AS count
925 FROM awa.jobs
926 WHERE created_at >= now() - make_interval(mins => $1)
927 GROUP BY bucket, state
928 ORDER BY bucket
929 "#,
930 )
931 .bind(minutes)
932 .fetch_all(executor)
933 .await?;
934
935 Ok(rows
936 .into_iter()
937 .map(|(bucket, state, count)| StateTimeseriesBucket {
938 bucket,
939 state,
940 count,
941 })
942 .collect())
943}
944
945pub async fn register_callback<'e, E>(
954 executor: E,
955 job_id: i64,
956 run_lease: i64,
957 timeout: std::time::Duration,
958) -> Result<Uuid, AwaError>
959where
960 E: PgExecutor<'e>,
961{
962 let callback_id = Uuid::new_v4();
963 let timeout_secs = timeout.as_secs_f64();
964 let result = sqlx::query(
965 r#"UPDATE awa.jobs
966 SET callback_id = $2,
967 callback_timeout_at = now() + make_interval(secs => $3),
968 callback_filter = NULL,
969 callback_on_complete = NULL,
970 callback_on_fail = NULL,
971 callback_transform = NULL
972 WHERE id = $1 AND state = 'running' AND run_lease = $4"#,
973 )
974 .bind(job_id)
975 .bind(callback_id)
976 .bind(timeout_secs)
977 .bind(run_lease)
978 .execute(executor)
979 .await?;
980 if result.rows_affected() == 0 {
981 return Err(AwaError::Validation("job is not in running state".into()));
982 }
983 Ok(callback_id)
984}
985
986pub async fn complete_external<'e, E>(
996 executor: E,
997 callback_id: Uuid,
998 _payload: Option<serde_json::Value>,
999) -> Result<JobRow, AwaError>
1000where
1001 E: PgExecutor<'e>,
1002{
1003 let row = sqlx::query_as::<_, JobRow>(
1004 r#"
1005 UPDATE awa.jobs
1006 SET state = 'completed',
1007 finalized_at = now(),
1008 callback_id = NULL,
1009 callback_timeout_at = NULL,
1010 callback_filter = NULL,
1011 callback_on_complete = NULL,
1012 callback_on_fail = NULL,
1013 callback_transform = NULL,
1014 heartbeat_at = NULL,
1015 deadline_at = NULL,
1016 progress = NULL
1017 WHERE callback_id = $1 AND state IN ('waiting_external', 'running')
1018 RETURNING *
1019 "#,
1020 )
1021 .bind(callback_id)
1022 .fetch_optional(executor)
1023 .await?;
1024
1025 row.ok_or(AwaError::CallbackNotFound {
1026 callback_id: callback_id.to_string(),
1027 })
1028}
1029
1030pub async fn fail_external<'e, E>(
1034 executor: E,
1035 callback_id: Uuid,
1036 error: &str,
1037) -> Result<JobRow, AwaError>
1038where
1039 E: PgExecutor<'e>,
1040{
1041 let row = sqlx::query_as::<_, JobRow>(
1042 r#"
1043 UPDATE awa.jobs
1044 SET state = 'failed',
1045 finalized_at = now(),
1046 callback_id = NULL,
1047 callback_timeout_at = NULL,
1048 callback_filter = NULL,
1049 callback_on_complete = NULL,
1050 callback_on_fail = NULL,
1051 callback_transform = NULL,
1052 heartbeat_at = NULL,
1053 deadline_at = NULL,
1054 errors = errors || jsonb_build_object(
1055 'error', $2::text,
1056 'attempt', attempt,
1057 'at', now()
1058 )::jsonb
1059 WHERE callback_id = $1 AND state IN ('waiting_external', 'running')
1060 RETURNING *
1061 "#,
1062 )
1063 .bind(callback_id)
1064 .bind(error)
1065 .fetch_optional(executor)
1066 .await?;
1067
1068 row.ok_or(AwaError::CallbackNotFound {
1069 callback_id: callback_id.to_string(),
1070 })
1071}
1072
1073pub async fn retry_external<'e, E>(executor: E, callback_id: Uuid) -> Result<JobRow, AwaError>
1083where
1084 E: PgExecutor<'e>,
1085{
1086 let row = sqlx::query_as::<_, JobRow>(
1087 r#"
1088 UPDATE awa.jobs
1089 SET state = 'available',
1090 attempt = 0,
1091 run_at = now(),
1092 finalized_at = NULL,
1093 callback_id = NULL,
1094 callback_timeout_at = NULL,
1095 callback_filter = NULL,
1096 callback_on_complete = NULL,
1097 callback_on_fail = NULL,
1098 callback_transform = NULL,
1099 heartbeat_at = NULL,
1100 deadline_at = NULL
1101 WHERE callback_id = $1 AND state = 'waiting_external'
1102 RETURNING *
1103 "#,
1104 )
1105 .bind(callback_id)
1106 .fetch_optional(executor)
1107 .await?;
1108
1109 row.ok_or(AwaError::CallbackNotFound {
1110 callback_id: callback_id.to_string(),
1111 })
1112}
1113
1114#[derive(Debug, Clone, Default)]
1121pub struct CallbackConfig {
1122 pub filter: Option<String>,
1124 pub on_complete: Option<String>,
1126 pub on_fail: Option<String>,
1128 pub transform: Option<String>,
1130}
1131
1132impl CallbackConfig {
1133 pub fn is_empty(&self) -> bool {
1135 self.filter.is_none()
1136 && self.on_complete.is_none()
1137 && self.on_fail.is_none()
1138 && self.transform.is_none()
1139 }
1140}
1141
1142#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1145pub enum DefaultAction {
1146 Complete,
1147 Fail,
1148 Ignore,
1149}
1150
1151#[derive(Debug)]
1153pub enum ResolveOutcome {
1154 Completed {
1155 payload: Option<serde_json::Value>,
1156 job: JobRow,
1157 },
1158 Failed {
1159 job: JobRow,
1160 },
1161 Ignored {
1162 reason: String,
1163 },
1164}
1165
1166impl ResolveOutcome {
1167 pub fn is_completed(&self) -> bool {
1168 matches!(self, ResolveOutcome::Completed { .. })
1169 }
1170 pub fn is_failed(&self) -> bool {
1171 matches!(self, ResolveOutcome::Failed { .. })
1172 }
1173 pub fn is_ignored(&self) -> bool {
1174 matches!(self, ResolveOutcome::Ignored { .. })
1175 }
1176}
1177
1178pub async fn register_callback_with_config<'e, E>(
1187 executor: E,
1188 job_id: i64,
1189 run_lease: i64,
1190 timeout: std::time::Duration,
1191 config: &CallbackConfig,
1192) -> Result<Uuid, AwaError>
1193where
1194 E: PgExecutor<'e>,
1195{
1196 #[cfg(feature = "cel")]
1198 {
1199 for (name, expr) in [
1200 ("filter", &config.filter),
1201 ("on_complete", &config.on_complete),
1202 ("on_fail", &config.on_fail),
1203 ("transform", &config.transform),
1204 ] {
1205 if let Some(src) = expr {
1206 let program = cel::Program::compile(src).map_err(|e| {
1207 AwaError::Validation(format!("invalid CEL expression for {name}: {e}"))
1208 })?;
1209
1210 let refs = program.references();
1214 let bad_vars: Vec<&str> = refs
1215 .variables()
1216 .into_iter()
1217 .filter(|v| *v != "payload")
1218 .collect();
1219 if !bad_vars.is_empty() {
1220 return Err(AwaError::Validation(format!(
1221 "CEL expression for {name} references undeclared variable(s): {}; \
1222 only 'payload' is available",
1223 bad_vars.join(", ")
1224 )));
1225 }
1226 }
1227 }
1228 }
1229
1230 #[cfg(not(feature = "cel"))]
1231 {
1232 if !config.is_empty() {
1233 return Err(AwaError::Validation(
1234 "CEL expressions require the 'cel' feature".into(),
1235 ));
1236 }
1237 }
1238
1239 let callback_id = Uuid::new_v4();
1240 let timeout_secs = timeout.as_secs_f64();
1241
1242 let result = sqlx::query(
1243 r#"UPDATE awa.jobs
1244 SET callback_id = $2,
1245 callback_timeout_at = now() + make_interval(secs => $3),
1246 callback_filter = $4,
1247 callback_on_complete = $5,
1248 callback_on_fail = $6,
1249 callback_transform = $7
1250 WHERE id = $1 AND state = 'running' AND run_lease = $8"#,
1251 )
1252 .bind(job_id)
1253 .bind(callback_id)
1254 .bind(timeout_secs)
1255 .bind(&config.filter)
1256 .bind(&config.on_complete)
1257 .bind(&config.on_fail)
1258 .bind(&config.transform)
1259 .bind(run_lease)
1260 .execute(executor)
1261 .await?;
1262
1263 if result.rows_affected() == 0 {
1264 return Err(AwaError::Validation("job is not in running state".into()));
1265 }
1266 Ok(callback_id)
1267}
1268
1269enum ResolveAction {
1271 Complete(Option<serde_json::Value>),
1272 Fail {
1273 error: String,
1274 expression: Option<String>,
1275 },
1276 Ignore(String),
1277}
1278
1279pub async fn resolve_callback(
1285 pool: &PgPool,
1286 callback_id: Uuid,
1287 payload: Option<serde_json::Value>,
1288 default_action: DefaultAction,
1289) -> Result<ResolveOutcome, AwaError> {
1290 let mut tx = pool.begin().await?;
1291
1292 let job = sqlx::query_as::<_, JobRow>(
1297 "SELECT * FROM awa.jobs_hot WHERE callback_id = $1
1298 AND state = 'waiting_external'
1299 FOR UPDATE",
1300 )
1301 .bind(callback_id)
1302 .fetch_optional(&mut *tx)
1303 .await?
1304 .ok_or(AwaError::CallbackNotFound {
1305 callback_id: callback_id.to_string(),
1306 })?;
1307
1308 let action = evaluate_or_default(&job, &payload, default_action)?;
1309
1310 match action {
1311 ResolveAction::Complete(transformed_payload) => {
1312 let completed_job = sqlx::query_as::<_, JobRow>(
1313 r#"
1314 UPDATE awa.jobs
1315 SET state = 'completed',
1316 finalized_at = now(),
1317 callback_id = NULL,
1318 callback_timeout_at = NULL,
1319 callback_filter = NULL,
1320 callback_on_complete = NULL,
1321 callback_on_fail = NULL,
1322 callback_transform = NULL,
1323 heartbeat_at = NULL,
1324 deadline_at = NULL,
1325 progress = NULL
1326 WHERE id = $1
1327 RETURNING *
1328 "#,
1329 )
1330 .bind(job.id)
1331 .fetch_one(&mut *tx)
1332 .await?;
1333
1334 tx.commit().await?;
1335 Ok(ResolveOutcome::Completed {
1336 payload: transformed_payload,
1337 job: completed_job,
1338 })
1339 }
1340 ResolveAction::Fail { error, expression } => {
1341 let mut error_json = serde_json::json!({
1342 "error": error,
1343 "attempt": job.attempt,
1344 "at": chrono::Utc::now().to_rfc3339(),
1345 });
1346 if let Some(expr) = expression {
1347 error_json["expression"] = serde_json::Value::String(expr);
1348 }
1349
1350 let failed_job = sqlx::query_as::<_, JobRow>(
1351 r#"
1352 UPDATE awa.jobs
1353 SET state = 'failed',
1354 finalized_at = now(),
1355 callback_id = NULL,
1356 callback_timeout_at = NULL,
1357 callback_filter = NULL,
1358 callback_on_complete = NULL,
1359 callback_on_fail = NULL,
1360 callback_transform = NULL,
1361 heartbeat_at = NULL,
1362 deadline_at = NULL,
1363 errors = errors || $2::jsonb
1364 WHERE id = $1
1365 RETURNING *
1366 "#,
1367 )
1368 .bind(job.id)
1369 .bind(error_json)
1370 .fetch_one(&mut *tx)
1371 .await?;
1372
1373 tx.commit().await?;
1374 Ok(ResolveOutcome::Failed { job: failed_job })
1375 }
1376 ResolveAction::Ignore(reason) => {
1377 Ok(ResolveOutcome::Ignored { reason })
1379 }
1380 }
1381}
1382
1383fn evaluate_or_default(
1385 job: &JobRow,
1386 payload: &Option<serde_json::Value>,
1387 default_action: DefaultAction,
1388) -> Result<ResolveAction, AwaError> {
1389 let has_expressions = job.callback_filter.is_some()
1390 || job.callback_on_complete.is_some()
1391 || job.callback_on_fail.is_some()
1392 || job.callback_transform.is_some();
1393
1394 if !has_expressions {
1395 return Ok(apply_default(default_action, payload));
1396 }
1397
1398 #[cfg(feature = "cel")]
1399 {
1400 Ok(evaluate_cel(job, payload, default_action))
1401 }
1402
1403 #[cfg(not(feature = "cel"))]
1404 {
1405 let _ = (payload, default_action);
1408 Err(AwaError::Validation(
1409 "CEL expressions present but 'cel' feature is not enabled".into(),
1410 ))
1411 }
1412}
1413
1414fn apply_default(
1415 default_action: DefaultAction,
1416 payload: &Option<serde_json::Value>,
1417) -> ResolveAction {
1418 match default_action {
1419 DefaultAction::Complete => ResolveAction::Complete(payload.clone()),
1420 DefaultAction::Fail => ResolveAction::Fail {
1421 error: "callback failed: default action".to_string(),
1422 expression: None,
1423 },
1424 DefaultAction::Ignore => {
1425 ResolveAction::Ignore("no expressions configured, default is ignore".to_string())
1426 }
1427 }
1428}
1429
1430#[cfg(feature = "cel")]
1431fn evaluate_cel(
1432 job: &JobRow,
1433 payload: &Option<serde_json::Value>,
1434 default_action: DefaultAction,
1435) -> ResolveAction {
1436 let payload_value = payload.as_ref().cloned().unwrap_or(serde_json::Value::Null);
1437
1438 if let Some(filter_expr) = &job.callback_filter {
1440 match eval_bool(filter_expr, &payload_value, job.id, "filter") {
1441 Ok(true) => {} Ok(false) => {
1443 return ResolveAction::Ignore("filter expression returned false".to_string());
1444 }
1445 Err(_) => {
1446 }
1448 }
1449 }
1450
1451 if let Some(on_fail_expr) = &job.callback_on_fail {
1453 match eval_bool(on_fail_expr, &payload_value, job.id, "on_fail") {
1454 Ok(true) => {
1455 return ResolveAction::Fail {
1456 error: "callback failed: on_fail expression matched".to_string(),
1457 expression: Some(on_fail_expr.clone()),
1458 };
1459 }
1460 Ok(false) => {} Err(_) => {
1462 }
1464 }
1465 }
1466
1467 if let Some(on_complete_expr) = &job.callback_on_complete {
1469 match eval_bool(on_complete_expr, &payload_value, job.id, "on_complete") {
1470 Ok(true) => {
1471 let transformed = apply_transform(job, &payload_value);
1473 return ResolveAction::Complete(Some(transformed));
1474 }
1475 Ok(false) => {} Err(_) => {
1477 }
1479 }
1480 }
1481
1482 apply_default(default_action, payload)
1484}
1485
1486#[cfg(feature = "cel")]
1487fn eval_bool(
1488 expression: &str,
1489 payload_value: &serde_json::Value,
1490 job_id: i64,
1491 expression_name: &str,
1492) -> Result<bool, ()> {
1493 let program = match cel::Program::compile(expression) {
1494 Ok(p) => p,
1495 Err(e) => {
1496 tracing::warn!(
1497 job_id,
1498 expression_name,
1499 expression,
1500 error = %e,
1501 "CEL compilation error during evaluation"
1502 );
1503 return Err(());
1504 }
1505 };
1506
1507 let mut context = cel::Context::default();
1508 if let Err(e) = context.add_variable("payload", payload_value.clone()) {
1509 tracing::warn!(
1510 job_id,
1511 expression_name,
1512 error = %e,
1513 "Failed to add payload variable to CEL context"
1514 );
1515 return Err(());
1516 }
1517
1518 match program.execute(&context) {
1519 Ok(cel::Value::Bool(b)) => Ok(b),
1520 Ok(other) => {
1521 tracing::warn!(
1522 job_id,
1523 expression_name,
1524 expression,
1525 result_type = ?other.type_of(),
1526 "CEL expression returned non-bool"
1527 );
1528 Err(())
1529 }
1530 Err(e) => {
1531 tracing::warn!(
1532 job_id,
1533 expression_name,
1534 expression,
1535 error = %e,
1536 "CEL execution error"
1537 );
1538 Err(())
1539 }
1540 }
1541}
1542
1543#[cfg(feature = "cel")]
1544fn apply_transform(job: &JobRow, payload_value: &serde_json::Value) -> serde_json::Value {
1545 let transform_expr = match &job.callback_transform {
1546 Some(expr) => expr,
1547 None => return payload_value.clone(),
1548 };
1549
1550 let program = match cel::Program::compile(transform_expr) {
1551 Ok(p) => p,
1552 Err(e) => {
1553 tracing::warn!(
1554 job_id = job.id,
1555 expression = transform_expr,
1556 error = %e,
1557 "CEL transform compilation error, using original payload"
1558 );
1559 return payload_value.clone();
1560 }
1561 };
1562
1563 let mut context = cel::Context::default();
1564 if let Err(e) = context.add_variable("payload", payload_value.clone()) {
1565 tracing::warn!(
1566 job_id = job.id,
1567 error = %e,
1568 "Failed to add payload variable for transform"
1569 );
1570 return payload_value.clone();
1571 }
1572
1573 match program.execute(&context) {
1574 Ok(value) => match value.json() {
1575 Ok(json) => json,
1576 Err(e) => {
1577 tracing::warn!(
1578 job_id = job.id,
1579 expression = transform_expr,
1580 error = %e,
1581 "CEL transform result could not be converted to JSON, using original payload"
1582 );
1583 payload_value.clone()
1584 }
1585 },
1586 Err(e) => {
1587 tracing::warn!(
1588 job_id = job.id,
1589 expression = transform_expr,
1590 error = %e,
1591 "CEL transform execution error, using original payload"
1592 );
1593 payload_value.clone()
1594 }
1595 }
1596}