1use crate::error::AwaError;
2use crate::job::{JobRow, JobState};
3use chrono::{DateTime, Duration, Utc};
4use serde::{Deserialize, Serialize};
5use sqlx::types::Json;
6use sqlx::PgExecutor;
7use sqlx::PgPool;
8use std::cmp::max;
9use std::collections::HashMap;
10use uuid::Uuid;
11
12pub async fn retry<'e, E>(executor: E, job_id: i64) -> Result<Option<JobRow>, AwaError>
14where
15 E: PgExecutor<'e>,
16{
17 sqlx::query_as::<_, JobRow>(
18 r#"
19 UPDATE awa.jobs
20 SET state = 'available', attempt = 0, run_at = now(),
21 finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL,
22 callback_id = NULL, callback_timeout_at = NULL,
23 callback_filter = NULL, callback_on_complete = NULL,
24 callback_on_fail = NULL, callback_transform = NULL
25 WHERE id = $1 AND state IN ('failed', 'cancelled', 'waiting_external')
26 RETURNING *
27 "#,
28 )
29 .bind(job_id)
30 .fetch_optional(executor)
31 .await?
32 .ok_or(AwaError::JobNotFound { id: job_id })
33 .map(Some)
34}
35
36pub async fn cancel<'e, E>(executor: E, job_id: i64) -> Result<Option<JobRow>, AwaError>
38where
39 E: PgExecutor<'e>,
40{
41 sqlx::query_as::<_, JobRow>(
42 r#"
43 UPDATE awa.jobs
44 SET state = 'cancelled', finalized_at = now(),
45 callback_id = NULL, callback_timeout_at = NULL,
46 callback_filter = NULL, callback_on_complete = NULL,
47 callback_on_fail = NULL, callback_transform = NULL
48 WHERE id = $1 AND state NOT IN ('completed', 'failed', 'cancelled')
49 RETURNING *
50 "#,
51 )
52 .bind(job_id)
53 .fetch_optional(executor)
54 .await?
55 .ok_or(AwaError::JobNotFound { id: job_id })
56 .map(Some)
57}
58
59pub async fn cancel_by_unique_key<'e, E>(
95 executor: E,
96 kind: &str,
97 queue: Option<&str>,
98 args: Option<&serde_json::Value>,
99 period_bucket: Option<i64>,
100) -> Result<Option<JobRow>, AwaError>
101where
102 E: PgExecutor<'e>,
103{
104 let unique_key = crate::unique::compute_unique_key(kind, queue, args, period_bucket);
105
106 let row = sqlx::query_as::<_, JobRow>(
110 r#"
111 WITH candidates AS (
112 SELECT id FROM awa.jobs_hot
113 WHERE unique_key = $1 AND state NOT IN ('completed', 'failed', 'cancelled')
114 UNION ALL
115 SELECT id FROM awa.scheduled_jobs
116 WHERE unique_key = $1 AND state NOT IN ('completed', 'failed', 'cancelled')
117 ORDER BY id ASC
118 LIMIT 1
119 )
120 UPDATE awa.jobs
121 SET state = 'cancelled', finalized_at = now(),
122 callback_id = NULL, callback_timeout_at = NULL,
123 callback_filter = NULL, callback_on_complete = NULL,
124 callback_on_fail = NULL, callback_transform = NULL
125 FROM candidates
126 WHERE awa.jobs.id = candidates.id
127 RETURNING awa.jobs.*
128 "#,
129 )
130 .bind(&unique_key)
131 .fetch_optional(executor)
132 .await?;
133
134 Ok(row)
135}
136
137pub async fn retry_failed_by_kind<'e, E>(executor: E, kind: &str) -> Result<Vec<JobRow>, AwaError>
139where
140 E: PgExecutor<'e>,
141{
142 let rows = sqlx::query_as::<_, JobRow>(
143 r#"
144 UPDATE awa.jobs
145 SET state = 'available', attempt = 0, run_at = now(),
146 finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL
147 WHERE kind = $1 AND state = 'failed'
148 RETURNING *
149 "#,
150 )
151 .bind(kind)
152 .fetch_all(executor)
153 .await?;
154
155 Ok(rows)
156}
157
158pub async fn retry_failed_by_queue<'e, E>(executor: E, queue: &str) -> Result<Vec<JobRow>, AwaError>
160where
161 E: PgExecutor<'e>,
162{
163 let rows = sqlx::query_as::<_, JobRow>(
164 r#"
165 UPDATE awa.jobs
166 SET state = 'available', attempt = 0, run_at = now(),
167 finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL
168 WHERE queue = $1 AND state = 'failed'
169 RETURNING *
170 "#,
171 )
172 .bind(queue)
173 .fetch_all(executor)
174 .await?;
175
176 Ok(rows)
177}
178
179pub async fn discard_failed<'e, E>(executor: E, kind: &str) -> Result<u64, AwaError>
181where
182 E: PgExecutor<'e>,
183{
184 let result = sqlx::query("DELETE FROM awa.jobs WHERE kind = $1 AND state = 'failed'")
185 .bind(kind)
186 .execute(executor)
187 .await?;
188
189 Ok(result.rows_affected())
190}
191
192pub async fn pause_queue<'e, E>(
194 executor: E,
195 queue: &str,
196 paused_by: Option<&str>,
197) -> Result<(), AwaError>
198where
199 E: PgExecutor<'e>,
200{
201 sqlx::query(
202 r#"
203 INSERT INTO awa.queue_meta (queue, paused, paused_at, paused_by)
204 VALUES ($1, TRUE, now(), $2)
205 ON CONFLICT (queue) DO UPDATE SET paused = TRUE, paused_at = now(), paused_by = $2
206 "#,
207 )
208 .bind(queue)
209 .bind(paused_by)
210 .execute(executor)
211 .await?;
212
213 Ok(())
214}
215
216pub async fn resume_queue<'e, E>(executor: E, queue: &str) -> Result<(), AwaError>
218where
219 E: PgExecutor<'e>,
220{
221 sqlx::query("UPDATE awa.queue_meta SET paused = FALSE WHERE queue = $1")
222 .bind(queue)
223 .execute(executor)
224 .await?;
225
226 Ok(())
227}
228
229pub async fn drain_queue<'e, E>(executor: E, queue: &str) -> Result<u64, AwaError>
231where
232 E: PgExecutor<'e>,
233{
234 let result = sqlx::query(
235 r#"
236 UPDATE awa.jobs
237 SET state = 'cancelled', finalized_at = now(),
238 callback_id = NULL, callback_timeout_at = NULL,
239 callback_filter = NULL, callback_on_complete = NULL,
240 callback_on_fail = NULL, callback_transform = NULL
241 WHERE queue = $1 AND state IN ('available', 'scheduled', 'retryable', 'waiting_external')
242 "#,
243 )
244 .bind(queue)
245 .execute(executor)
246 .await?;
247
248 Ok(result.rows_affected())
249}
250
251#[derive(Debug, Clone, Serialize)]
253pub struct QueueStats {
254 pub queue: String,
255 pub total_queued: i64,
257 pub scheduled: i64,
258 pub available: i64,
259 pub retryable: i64,
260 pub running: i64,
261 pub failed: i64,
262 pub waiting_external: i64,
263 pub completed_last_hour: i64,
264 pub lag_seconds: Option<f64>,
265 pub paused: bool,
266}
267
268#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
270pub struct RateLimitSnapshot {
271 pub max_rate: f64,
272 pub burst: u32,
273}
274
275#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
277#[serde(rename_all = "snake_case")]
278pub enum QueueRuntimeMode {
279 HardReserved,
280 Weighted,
281}
282
283#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
285pub struct QueueRuntimeConfigSnapshot {
286 pub mode: QueueRuntimeMode,
287 pub max_workers: Option<u32>,
288 pub min_workers: Option<u32>,
289 pub weight: Option<u32>,
290 pub global_max_workers: Option<u32>,
291 pub poll_interval_ms: u64,
292 pub deadline_duration_secs: u64,
293 pub priority_aging_interval_secs: u64,
294 pub rate_limit: Option<RateLimitSnapshot>,
295}
296
297#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
299pub struct QueueRuntimeSnapshot {
300 pub queue: String,
301 pub in_flight: u32,
302 pub overflow_held: Option<u32>,
303 pub config: QueueRuntimeConfigSnapshot,
304}
305
306#[derive(Debug, Clone, Serialize, Deserialize)]
308pub struct RuntimeSnapshotInput {
309 pub instance_id: Uuid,
310 pub hostname: Option<String>,
311 pub pid: i32,
312 pub version: String,
313 pub started_at: DateTime<Utc>,
314 pub snapshot_interval_ms: i64,
315 pub healthy: bool,
316 pub postgres_connected: bool,
317 pub poll_loop_alive: bool,
318 pub heartbeat_alive: bool,
319 pub maintenance_alive: bool,
320 pub shutting_down: bool,
321 pub leader: bool,
322 pub global_max_workers: Option<u32>,
323 pub queues: Vec<QueueRuntimeSnapshot>,
324}
325
326#[derive(Debug, Clone, Serialize, Deserialize)]
328pub struct RuntimeInstance {
329 pub instance_id: Uuid,
330 pub hostname: Option<String>,
331 pub pid: i32,
332 pub version: String,
333 pub started_at: DateTime<Utc>,
334 pub last_seen_at: DateTime<Utc>,
335 pub snapshot_interval_ms: i64,
336 pub stale: bool,
337 pub healthy: bool,
338 pub postgres_connected: bool,
339 pub poll_loop_alive: bool,
340 pub heartbeat_alive: bool,
341 pub maintenance_alive: bool,
342 pub shutting_down: bool,
343 pub leader: bool,
344 pub global_max_workers: Option<u32>,
345 pub queues: Vec<QueueRuntimeSnapshot>,
346}
347
348impl RuntimeInstance {
349 fn stale_cutoff(interval_ms: i64) -> Duration {
350 let interval_ms = max(interval_ms, 1_000);
351 Duration::milliseconds(max(interval_ms.saturating_mul(3), 30_000))
352 }
353
354 fn from_db_row(row: RuntimeInstanceRow, now: DateTime<Utc>) -> Self {
355 let stale = row.last_seen_at + Self::stale_cutoff(row.snapshot_interval_ms) < now;
356 Self {
357 instance_id: row.instance_id,
358 hostname: row.hostname,
359 pid: row.pid,
360 version: row.version,
361 started_at: row.started_at,
362 last_seen_at: row.last_seen_at,
363 snapshot_interval_ms: row.snapshot_interval_ms,
364 stale,
365 healthy: row.healthy,
366 postgres_connected: row.postgres_connected,
367 poll_loop_alive: row.poll_loop_alive,
368 heartbeat_alive: row.heartbeat_alive,
369 maintenance_alive: row.maintenance_alive,
370 shutting_down: row.shutting_down,
371 leader: row.leader,
372 global_max_workers: row.global_max_workers.map(|v| v as u32),
373 queues: row.queues.0,
374 }
375 }
376}
377
378#[derive(Debug, Clone, Serialize, Deserialize)]
380pub struct RuntimeOverview {
381 pub total_instances: usize,
382 pub live_instances: usize,
383 pub stale_instances: usize,
384 pub healthy_instances: usize,
385 pub leader_instances: usize,
386 pub instances: Vec<RuntimeInstance>,
387}
388
389#[derive(Debug, Clone, Serialize, Deserialize)]
391pub struct QueueRuntimeSummary {
392 pub queue: String,
393 pub instance_count: usize,
394 pub live_instances: usize,
395 pub stale_instances: usize,
396 pub healthy_instances: usize,
397 pub total_in_flight: u64,
398 pub overflow_held_total: Option<u64>,
399 pub config_mismatch: bool,
400 pub config: Option<QueueRuntimeConfigSnapshot>,
401}
402
403#[derive(Debug, sqlx::FromRow)]
404struct RuntimeInstanceRow {
405 instance_id: Uuid,
406 hostname: Option<String>,
407 pid: i32,
408 version: String,
409 started_at: DateTime<Utc>,
410 last_seen_at: DateTime<Utc>,
411 snapshot_interval_ms: i64,
412 healthy: bool,
413 postgres_connected: bool,
414 poll_loop_alive: bool,
415 heartbeat_alive: bool,
416 maintenance_alive: bool,
417 shutting_down: bool,
418 leader: bool,
419 global_max_workers: Option<i32>,
420 queues: Json<Vec<QueueRuntimeSnapshot>>,
421}
422
423pub async fn upsert_runtime_snapshot<'e, E>(
425 executor: E,
426 snapshot: &RuntimeSnapshotInput,
427) -> Result<(), AwaError>
428where
429 E: PgExecutor<'e>,
430{
431 sqlx::query(
432 r#"
433 INSERT INTO awa.runtime_instances (
434 instance_id,
435 hostname,
436 pid,
437 version,
438 started_at,
439 last_seen_at,
440 snapshot_interval_ms,
441 healthy,
442 postgres_connected,
443 poll_loop_alive,
444 heartbeat_alive,
445 maintenance_alive,
446 shutting_down,
447 leader,
448 global_max_workers,
449 queues
450 )
451 VALUES (
452 $1, $2, $3, $4, $5, now(), $6, $7, $8, $9, $10, $11, $12, $13, $14, $15
453 )
454 ON CONFLICT (instance_id) DO UPDATE SET
455 hostname = EXCLUDED.hostname,
456 pid = EXCLUDED.pid,
457 version = EXCLUDED.version,
458 started_at = EXCLUDED.started_at,
459 last_seen_at = now(),
460 snapshot_interval_ms = EXCLUDED.snapshot_interval_ms,
461 healthy = EXCLUDED.healthy,
462 postgres_connected = EXCLUDED.postgres_connected,
463 poll_loop_alive = EXCLUDED.poll_loop_alive,
464 heartbeat_alive = EXCLUDED.heartbeat_alive,
465 maintenance_alive = EXCLUDED.maintenance_alive,
466 shutting_down = EXCLUDED.shutting_down,
467 leader = EXCLUDED.leader,
468 global_max_workers = EXCLUDED.global_max_workers,
469 queues = EXCLUDED.queues
470 "#,
471 )
472 .bind(snapshot.instance_id)
473 .bind(snapshot.hostname.as_deref())
474 .bind(snapshot.pid)
475 .bind(&snapshot.version)
476 .bind(snapshot.started_at)
477 .bind(snapshot.snapshot_interval_ms)
478 .bind(snapshot.healthy)
479 .bind(snapshot.postgres_connected)
480 .bind(snapshot.poll_loop_alive)
481 .bind(snapshot.heartbeat_alive)
482 .bind(snapshot.maintenance_alive)
483 .bind(snapshot.shutting_down)
484 .bind(snapshot.leader)
485 .bind(snapshot.global_max_workers.map(|v| v as i32))
486 .bind(Json(&snapshot.queues))
487 .execute(executor)
488 .await?;
489
490 Ok(())
491}
492
493pub async fn cleanup_runtime_snapshots<'e, E>(
495 executor: E,
496 max_age: Duration,
497) -> Result<u64, AwaError>
498where
499 E: PgExecutor<'e>,
500{
501 let seconds = max(max_age.num_seconds(), 1);
502 let result = sqlx::query(
503 "DELETE FROM awa.runtime_instances WHERE last_seen_at < now() - make_interval(secs => $1)",
504 )
505 .bind(seconds)
506 .execute(executor)
507 .await?;
508
509 Ok(result.rows_affected())
510}
511
512pub async fn list_runtime_instances<'e, E>(executor: E) -> Result<Vec<RuntimeInstance>, AwaError>
514where
515 E: PgExecutor<'e>,
516{
517 let rows = sqlx::query_as::<_, RuntimeInstanceRow>(
518 r#"
519 SELECT
520 instance_id,
521 hostname,
522 pid,
523 version,
524 started_at,
525 last_seen_at,
526 snapshot_interval_ms,
527 healthy,
528 postgres_connected,
529 poll_loop_alive,
530 heartbeat_alive,
531 maintenance_alive,
532 shutting_down,
533 leader,
534 global_max_workers,
535 queues
536 FROM awa.runtime_instances
537 ORDER BY leader DESC, last_seen_at DESC, started_at DESC
538 "#,
539 )
540 .fetch_all(executor)
541 .await?;
542
543 let now = Utc::now();
544 Ok(rows
545 .into_iter()
546 .map(|row| RuntimeInstance::from_db_row(row, now))
547 .collect())
548}
549
550pub async fn runtime_overview<'e, E>(executor: E) -> Result<RuntimeOverview, AwaError>
552where
553 E: PgExecutor<'e>,
554{
555 let instances = list_runtime_instances(executor).await?;
556 let total_instances = instances.len();
557 let stale_instances = instances.iter().filter(|i| i.stale).count();
558 let live_instances = total_instances.saturating_sub(stale_instances);
559 let healthy_instances = instances.iter().filter(|i| !i.stale && i.healthy).count();
560 let leader_instances = instances.iter().filter(|i| !i.stale && i.leader).count();
561
562 Ok(RuntimeOverview {
563 total_instances,
564 live_instances,
565 stale_instances,
566 healthy_instances,
567 leader_instances,
568 instances,
569 })
570}
571
572pub async fn queue_runtime_summary<'e, E>(executor: E) -> Result<Vec<QueueRuntimeSummary>, AwaError>
574where
575 E: PgExecutor<'e>,
576{
577 let instances = list_runtime_instances(executor).await?;
578 let mut by_queue: HashMap<String, Vec<(bool, bool, QueueRuntimeSnapshot)>> = HashMap::new();
579
580 for instance in instances {
581 let is_live = !instance.stale;
582 let is_healthy = is_live && instance.healthy;
583 for queue in instance.queues {
584 by_queue
585 .entry(queue.queue.clone())
586 .or_default()
587 .push((is_live, is_healthy, queue));
588 }
589 }
590
591 let mut summaries: Vec<_> = by_queue
592 .into_iter()
593 .map(|(queue, entries)| {
594 let instance_count = entries.len();
595 let live_instances = entries.iter().filter(|(live, _, _)| *live).count();
596 let stale_instances = instance_count.saturating_sub(live_instances);
597 let healthy_instances = entries.iter().filter(|(_, healthy, _)| *healthy).count();
598 let total_in_flight = entries
599 .iter()
600 .filter(|(live, _, _)| *live)
601 .map(|(_, _, queue)| u64::from(queue.in_flight))
602 .sum();
603
604 let overflow_total: u64 = entries
605 .iter()
606 .filter(|(live, _, _)| *live)
607 .filter_map(|(_, _, queue)| queue.overflow_held.map(u64::from))
608 .sum();
609
610 let live_configs: Vec<_> = entries
611 .iter()
612 .filter(|(live, _, _)| *live)
613 .map(|(_, _, queue)| queue.config.clone())
614 .collect();
615 let config_candidates = if live_configs.is_empty() {
616 entries
617 .iter()
618 .map(|(_, _, queue)| queue.config.clone())
619 .collect::<Vec<_>>()
620 } else {
621 live_configs
622 };
623 let config = config_candidates.first().cloned();
624 let config_mismatch = config_candidates
625 .iter()
626 .skip(1)
627 .any(|candidate| Some(candidate) != config.as_ref());
628
629 QueueRuntimeSummary {
630 queue,
631 instance_count,
632 live_instances,
633 stale_instances,
634 healthy_instances,
635 total_in_flight,
636 overflow_held_total: config
637 .as_ref()
638 .filter(|cfg| cfg.mode == QueueRuntimeMode::Weighted)
639 .map(|_| overflow_total),
640 config_mismatch,
641 config,
642 }
643 })
644 .collect();
645
646 summaries.sort_by(|a, b| a.queue.cmp(&b.queue));
647 Ok(summaries)
648}
649
650pub async fn queue_stats<'e, E>(executor: E) -> Result<Vec<QueueStats>, AwaError>
663where
664 E: PgExecutor<'e>,
665{
666 let rows = sqlx::query_as::<
667 _,
668 (
669 String,
670 i64,
671 i64,
672 i64,
673 i64,
674 i64,
675 i64,
676 i64,
677 i64,
678 Option<f64>,
679 bool,
680 ),
681 >(
682 r#"
683 WITH available_lag AS (
684 SELECT
685 queue,
686 EXTRACT(EPOCH FROM (now() - min(run_at)))::float8 AS lag_seconds
687 FROM awa.jobs_hot
688 WHERE state = 'available'
689 GROUP BY queue
690 ),
691 completed_recent AS (
692 SELECT
693 queue,
694 count(*)::bigint AS completed_last_hour
695 FROM awa.jobs_hot
696 WHERE state = 'completed'
697 AND finalized_at > now() - interval '1 hour'
698 GROUP BY queue
699 )
700 SELECT
701 qs.queue,
702 qs.scheduled + qs.available + qs.running + qs.retryable + qs.waiting_external AS total_queued,
703 qs.scheduled,
704 qs.available,
705 qs.retryable,
706 qs.running,
707 qs.failed,
708 qs.waiting_external,
709 COALESCE(cr.completed_last_hour, 0) AS completed_last_hour,
710 al.lag_seconds,
711 COALESCE(qm.paused, FALSE) AS paused
712 FROM awa.queue_state_counts qs
713 LEFT JOIN available_lag al ON al.queue = qs.queue
714 LEFT JOIN completed_recent cr ON cr.queue = qs.queue
715 LEFT JOIN awa.queue_meta qm ON qm.queue = qs.queue
716 ORDER BY qs.queue
717 "#,
718 )
719 .fetch_all(executor)
720 .await?;
721
722 Ok(rows
723 .into_iter()
724 .map(
725 |(
726 queue,
727 total_queued,
728 scheduled,
729 available,
730 retryable,
731 running,
732 failed,
733 waiting_external,
734 completed_last_hour,
735 lag_seconds,
736 paused,
737 )| QueueStats {
738 queue,
739 total_queued,
740 scheduled,
741 available,
742 retryable,
743 running,
744 failed,
745 waiting_external,
746 completed_last_hour,
747 lag_seconds,
748 paused,
749 },
750 )
751 .collect())
752}
753
754#[derive(Debug, Clone, Default, Serialize)]
756pub struct ListJobsFilter {
757 pub state: Option<JobState>,
758 pub kind: Option<String>,
759 pub queue: Option<String>,
760 pub tag: Option<String>,
761 pub before_id: Option<i64>,
762 pub limit: Option<i64>,
763}
764
765pub async fn list_jobs<'e, E>(executor: E, filter: &ListJobsFilter) -> Result<Vec<JobRow>, AwaError>
767where
768 E: PgExecutor<'e>,
769{
770 let limit = filter.limit.unwrap_or(100);
771
772 let rows = sqlx::query_as::<_, JobRow>(
773 r#"
774 SELECT * FROM awa.jobs
775 WHERE ($1::awa.job_state IS NULL OR state = $1)
776 AND ($2::text IS NULL OR kind = $2)
777 AND ($3::text IS NULL OR queue = $3)
778 AND ($4::text IS NULL OR tags @> ARRAY[$4]::text[])
779 AND ($5::bigint IS NULL OR id < $5)
780 ORDER BY id DESC
781 LIMIT $6
782 "#,
783 )
784 .bind(filter.state)
785 .bind(&filter.kind)
786 .bind(&filter.queue)
787 .bind(&filter.tag)
788 .bind(filter.before_id)
789 .bind(limit)
790 .fetch_all(executor)
791 .await?;
792
793 Ok(rows)
794}
795
796pub async fn get_job<'e, E>(executor: E, job_id: i64) -> Result<JobRow, AwaError>
798where
799 E: PgExecutor<'e>,
800{
801 let row = sqlx::query_as::<_, JobRow>("SELECT * FROM awa.jobs WHERE id = $1")
802 .bind(job_id)
803 .fetch_optional(executor)
804 .await?;
805
806 row.ok_or(AwaError::JobNotFound { id: job_id })
807}
808
809pub async fn state_counts<'e, E>(executor: E) -> Result<HashMap<JobState, i64>, AwaError>
813where
814 E: PgExecutor<'e>,
815{
816 let rows = sqlx::query_as::<_, (JobState, i64)>(
819 r#"
820 SELECT v.state, v.total FROM (
821 SELECT
822 COALESCE(sum(scheduled), 0)::bigint AS scheduled,
823 COALESCE(sum(available), 0)::bigint AS available,
824 COALESCE(sum(running), 0)::bigint AS running,
825 COALESCE(sum(completed), 0)::bigint AS completed,
826 COALESCE(sum(retryable), 0)::bigint AS retryable,
827 COALESCE(sum(failed), 0)::bigint AS failed,
828 COALESCE(sum(cancelled), 0)::bigint AS cancelled,
829 COALESCE(sum(waiting_external), 0)::bigint AS waiting_external
830 FROM awa.queue_state_counts
831 ) s,
832 LATERAL (VALUES
833 ('scheduled'::awa.job_state, s.scheduled),
834 ('available'::awa.job_state, s.available),
835 ('running'::awa.job_state, s.running),
836 ('completed'::awa.job_state, s.completed),
837 ('retryable'::awa.job_state, s.retryable),
838 ('failed'::awa.job_state, s.failed),
839 ('cancelled'::awa.job_state, s.cancelled),
840 ('waiting_external'::awa.job_state, s.waiting_external)
841 ) AS v(state, total)
842 "#,
843 )
844 .fetch_all(executor)
845 .await?;
846
847 Ok(rows.into_iter().collect())
848}
849
850pub async fn distinct_kinds<'e, E>(executor: E) -> Result<Vec<String>, AwaError>
854where
855 E: PgExecutor<'e>,
856{
857 let rows = sqlx::query_scalar::<_, String>(
858 "SELECT kind FROM awa.job_kind_catalog WHERE ref_count > 0 ORDER BY kind",
859 )
860 .fetch_all(executor)
861 .await?;
862
863 Ok(rows)
864}
865
866pub async fn distinct_queues<'e, E>(executor: E) -> Result<Vec<String>, AwaError>
870where
871 E: PgExecutor<'e>,
872{
873 let rows = sqlx::query_scalar::<_, String>(
874 "SELECT queue FROM awa.job_queue_catalog WHERE ref_count > 0 ORDER BY queue",
875 )
876 .fetch_all(executor)
877 .await?;
878
879 Ok(rows)
880}
881
882pub async fn recompute_dirty_admin_metadata(pool: &PgPool) -> Result<i32, AwaError> {
888 let count: i32 = sqlx::query_scalar("SELECT awa.recompute_dirty_admin_metadata(100)")
889 .fetch_one(pool)
890 .await?;
891 Ok(count)
892}
893
894pub async fn flush_dirty_admin_metadata(pool: &PgPool) -> Result<i32, AwaError> {
901 let mut total = 0i32;
902 loop {
903 let count: i32 = sqlx::query_scalar("SELECT awa.recompute_dirty_admin_metadata(100)")
904 .fetch_one(pool)
905 .await?;
906 total += count;
907 if count == 0 {
908 break;
909 }
910 }
911 Ok(total)
912}
913
914pub async fn refresh_admin_metadata(pool: &PgPool) -> Result<(), AwaError> {
920 sqlx::query("SELECT awa.refresh_admin_metadata()")
921 .execute(pool)
922 .await?;
923 Ok(())
924}
925
926pub async fn bulk_retry<'e, E>(executor: E, ids: &[i64]) -> Result<Vec<JobRow>, AwaError>
928where
929 E: PgExecutor<'e>,
930{
931 let rows = sqlx::query_as::<_, JobRow>(
932 r#"
933 UPDATE awa.jobs
934 SET state = 'available', attempt = 0, run_at = now(),
935 finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL,
936 callback_id = NULL, callback_timeout_at = NULL,
937 callback_filter = NULL, callback_on_complete = NULL,
938 callback_on_fail = NULL, callback_transform = NULL
939 WHERE id = ANY($1) AND state IN ('failed', 'cancelled', 'waiting_external')
940 RETURNING *
941 "#,
942 )
943 .bind(ids)
944 .fetch_all(executor)
945 .await?;
946
947 Ok(rows)
948}
949
950pub async fn bulk_cancel<'e, E>(executor: E, ids: &[i64]) -> Result<Vec<JobRow>, AwaError>
952where
953 E: PgExecutor<'e>,
954{
955 let rows = sqlx::query_as::<_, JobRow>(
956 r#"
957 UPDATE awa.jobs
958 SET state = 'cancelled', finalized_at = now(),
959 callback_id = NULL, callback_timeout_at = NULL,
960 callback_filter = NULL, callback_on_complete = NULL,
961 callback_on_fail = NULL, callback_transform = NULL
962 WHERE id = ANY($1) AND state NOT IN ('completed', 'failed', 'cancelled')
963 RETURNING *
964 "#,
965 )
966 .bind(ids)
967 .fetch_all(executor)
968 .await?;
969
970 Ok(rows)
971}
972
973#[derive(Debug, Clone, Serialize)]
975pub struct StateTimeseriesBucket {
976 pub bucket: chrono::DateTime<chrono::Utc>,
977 pub state: JobState,
978 pub count: i64,
979}
980
981pub async fn state_timeseries<'e, E>(
983 executor: E,
984 minutes: i32,
985) -> Result<Vec<StateTimeseriesBucket>, AwaError>
986where
987 E: PgExecutor<'e>,
988{
989 let rows = sqlx::query_as::<_, (chrono::DateTime<chrono::Utc>, JobState, i64)>(
990 r#"
991 SELECT
992 date_trunc('minute', created_at) AS bucket,
993 state,
994 count(*) AS count
995 FROM awa.jobs
996 WHERE created_at >= now() - make_interval(mins => $1)
997 GROUP BY bucket, state
998 ORDER BY bucket
999 "#,
1000 )
1001 .bind(minutes)
1002 .fetch_all(executor)
1003 .await?;
1004
1005 Ok(rows
1006 .into_iter()
1007 .map(|(bucket, state, count)| StateTimeseriesBucket {
1008 bucket,
1009 state,
1010 count,
1011 })
1012 .collect())
1013}
1014
1015pub async fn register_callback<'e, E>(
1024 executor: E,
1025 job_id: i64,
1026 run_lease: i64,
1027 timeout: std::time::Duration,
1028) -> Result<Uuid, AwaError>
1029where
1030 E: PgExecutor<'e>,
1031{
1032 let callback_id = Uuid::new_v4();
1033 let timeout_secs = timeout.as_secs_f64();
1034 let result = sqlx::query(
1035 r#"UPDATE awa.jobs
1036 SET callback_id = $2,
1037 callback_timeout_at = now() + make_interval(secs => $3),
1038 callback_filter = NULL,
1039 callback_on_complete = NULL,
1040 callback_on_fail = NULL,
1041 callback_transform = NULL
1042 WHERE id = $1 AND state = 'running' AND run_lease = $4"#,
1043 )
1044 .bind(job_id)
1045 .bind(callback_id)
1046 .bind(timeout_secs)
1047 .bind(run_lease)
1048 .execute(executor)
1049 .await?;
1050 if result.rows_affected() == 0 {
1051 return Err(AwaError::Validation("job is not in running state".into()));
1052 }
1053 Ok(callback_id)
1054}
1055
1056pub async fn complete_external<'e, E>(
1067 executor: E,
1068 callback_id: Uuid,
1069 payload: Option<serde_json::Value>,
1070 run_lease: Option<i64>,
1071) -> Result<JobRow, AwaError>
1072where
1073 E: PgExecutor<'e>,
1074{
1075 complete_external_inner(executor, callback_id, payload, run_lease, false).await
1076}
1077
1078pub async fn resume_external<'e, E>(
1084 executor: E,
1085 callback_id: Uuid,
1086 payload: Option<serde_json::Value>,
1087 run_lease: Option<i64>,
1088) -> Result<JobRow, AwaError>
1089where
1090 E: PgExecutor<'e>,
1091{
1092 complete_external_inner(executor, callback_id, payload, run_lease, true).await
1093}
1094
1095async fn complete_external_inner<'e, E>(
1096 executor: E,
1097 callback_id: Uuid,
1098 payload: Option<serde_json::Value>,
1099 run_lease: Option<i64>,
1100 resume: bool,
1101) -> Result<JobRow, AwaError>
1102where
1103 E: PgExecutor<'e>,
1104{
1105 let row = if resume {
1106 let payload_json = payload.unwrap_or(serde_json::Value::Null);
1109 sqlx::query_as::<_, JobRow>(
1110 r#"
1111 UPDATE awa.jobs
1112 SET state = 'running',
1113 callback_id = NULL,
1114 callback_timeout_at = NULL,
1115 callback_filter = NULL,
1116 callback_on_complete = NULL,
1117 callback_on_fail = NULL,
1118 callback_transform = NULL,
1119 heartbeat_at = now(),
1120 metadata = metadata || jsonb_build_object('_awa_callback_result', $3::jsonb)
1121 WHERE callback_id = $1 AND state IN ('waiting_external', 'running')
1122 AND ($2::bigint IS NULL OR run_lease = $2)
1123 RETURNING *
1124 "#,
1125 )
1126 .bind(callback_id)
1127 .bind(run_lease)
1128 .bind(&payload_json)
1129 .fetch_optional(executor)
1130 .await?
1131 } else {
1132 sqlx::query_as::<_, JobRow>(
1134 r#"
1135 UPDATE awa.jobs
1136 SET state = 'completed',
1137 finalized_at = now(),
1138 callback_id = NULL,
1139 callback_timeout_at = NULL,
1140 callback_filter = NULL,
1141 callback_on_complete = NULL,
1142 callback_on_fail = NULL,
1143 callback_transform = NULL,
1144 heartbeat_at = NULL,
1145 deadline_at = NULL,
1146 progress = NULL
1147 WHERE callback_id = $1 AND state IN ('waiting_external', 'running')
1148 AND ($2::bigint IS NULL OR run_lease = $2)
1149 RETURNING *
1150 "#,
1151 )
1152 .bind(callback_id)
1153 .bind(run_lease)
1154 .fetch_optional(executor)
1155 .await?
1156 };
1157
1158 row.ok_or(AwaError::CallbackNotFound {
1159 callback_id: callback_id.to_string(),
1160 })
1161}
1162
1163pub async fn fail_external<'e, E>(
1167 executor: E,
1168 callback_id: Uuid,
1169 error: &str,
1170 run_lease: Option<i64>,
1171) -> Result<JobRow, AwaError>
1172where
1173 E: PgExecutor<'e>,
1174{
1175 let row = sqlx::query_as::<_, JobRow>(
1176 r#"
1177 UPDATE awa.jobs
1178 SET state = 'failed',
1179 finalized_at = now(),
1180 callback_id = NULL,
1181 callback_timeout_at = NULL,
1182 callback_filter = NULL,
1183 callback_on_complete = NULL,
1184 callback_on_fail = NULL,
1185 callback_transform = NULL,
1186 heartbeat_at = NULL,
1187 deadline_at = NULL,
1188 errors = errors || jsonb_build_object(
1189 'error', $2::text,
1190 'attempt', attempt,
1191 'at', now()
1192 )::jsonb
1193 WHERE callback_id = $1 AND state IN ('waiting_external', 'running')
1194 AND ($3::bigint IS NULL OR run_lease = $3)
1195 RETURNING *
1196 "#,
1197 )
1198 .bind(callback_id)
1199 .bind(error)
1200 .bind(run_lease)
1201 .fetch_optional(executor)
1202 .await?;
1203
1204 row.ok_or(AwaError::CallbackNotFound {
1205 callback_id: callback_id.to_string(),
1206 })
1207}
1208
1209pub async fn retry_external<'e, E>(
1219 executor: E,
1220 callback_id: Uuid,
1221 run_lease: Option<i64>,
1222) -> Result<JobRow, AwaError>
1223where
1224 E: PgExecutor<'e>,
1225{
1226 let row = sqlx::query_as::<_, JobRow>(
1227 r#"
1228 UPDATE awa.jobs
1229 SET state = 'available',
1230 attempt = 0,
1231 run_at = now(),
1232 finalized_at = NULL,
1233 callback_id = NULL,
1234 callback_timeout_at = NULL,
1235 callback_filter = NULL,
1236 callback_on_complete = NULL,
1237 callback_on_fail = NULL,
1238 callback_transform = NULL,
1239 heartbeat_at = NULL,
1240 deadline_at = NULL
1241 WHERE callback_id = $1 AND state = 'waiting_external'
1242 AND ($2::bigint IS NULL OR run_lease = $2)
1243 RETURNING *
1244 "#,
1245 )
1246 .bind(callback_id)
1247 .bind(run_lease)
1248 .fetch_optional(executor)
1249 .await?;
1250
1251 row.ok_or(AwaError::CallbackNotFound {
1252 callback_id: callback_id.to_string(),
1253 })
1254}
1255
1256pub async fn heartbeat_callback<'e, E>(
1265 executor: E,
1266 callback_id: Uuid,
1267 timeout: std::time::Duration,
1268) -> Result<JobRow, AwaError>
1269where
1270 E: PgExecutor<'e>,
1271{
1272 let timeout_secs = timeout.as_secs_f64();
1273 let row = sqlx::query_as::<_, JobRow>(
1274 r#"
1275 UPDATE awa.jobs
1276 SET callback_timeout_at = now() + make_interval(secs => $2)
1277 WHERE callback_id = $1 AND state = 'waiting_external'
1278 RETURNING *
1279 "#,
1280 )
1281 .bind(callback_id)
1282 .bind(timeout_secs)
1283 .fetch_optional(executor)
1284 .await?;
1285
1286 row.ok_or(AwaError::CallbackNotFound {
1287 callback_id: callback_id.to_string(),
1288 })
1289}
1290
1291pub async fn cancel_callback<'e, E>(
1297 executor: E,
1298 job_id: i64,
1299 run_lease: i64,
1300) -> Result<bool, AwaError>
1301where
1302 E: PgExecutor<'e>,
1303{
1304 let result = sqlx::query(
1305 r#"
1306 UPDATE awa.jobs
1307 SET callback_id = NULL,
1308 callback_timeout_at = NULL,
1309 callback_filter = NULL,
1310 callback_on_complete = NULL,
1311 callback_on_fail = NULL,
1312 callback_transform = NULL
1313 WHERE id = $1 AND callback_id IS NOT NULL AND state = 'running' AND run_lease = $2
1314 "#,
1315 )
1316 .bind(job_id)
1317 .bind(run_lease)
1318 .execute(executor)
1319 .await?;
1320
1321 Ok(result.rows_affected() > 0)
1322}
1323
1324#[derive(Debug)]
1332pub enum CallbackPollResult {
1333 Resolved(serde_json::Value),
1335 Pending,
1337 Stale {
1339 token: Uuid,
1340 current: Uuid,
1341 state: JobState,
1342 },
1343 UnexpectedState { token: Uuid, state: JobState },
1345 NotFound,
1347}
1348
1349pub async fn enter_callback_wait(
1354 pool: &PgPool,
1355 job_id: i64,
1356 run_lease: i64,
1357 callback_id: Uuid,
1358) -> Result<bool, AwaError> {
1359 let result = sqlx::query(
1360 r#"
1361 UPDATE awa.jobs
1362 SET state = 'waiting_external',
1363 heartbeat_at = NULL,
1364 deadline_at = NULL
1365 WHERE id = $1 AND state = 'running' AND run_lease = $2 AND callback_id = $3
1366 "#,
1367 )
1368 .bind(job_id)
1369 .bind(run_lease)
1370 .bind(callback_id)
1371 .execute(pool)
1372 .await?;
1373
1374 Ok(result.rows_affected() > 0)
1375}
1376
1377pub async fn check_callback_state(
1383 pool: &PgPool,
1384 job_id: i64,
1385 callback_id: Uuid,
1386) -> Result<CallbackPollResult, AwaError> {
1387 let row: Option<(JobState, Option<Uuid>, serde_json::Value)> =
1388 sqlx::query_as("SELECT state, callback_id, metadata FROM awa.jobs WHERE id = $1")
1389 .bind(job_id)
1390 .fetch_optional(pool)
1391 .await?;
1392
1393 match row {
1394 Some((JobState::Running, None, metadata))
1395 if metadata.get("_awa_callback_result").is_some() =>
1396 {
1397 let payload = take_callback_payload(pool, job_id, metadata).await?;
1398 Ok(CallbackPollResult::Resolved(payload))
1399 }
1400 Some((state, Some(current_callback_id), _)) if current_callback_id != callback_id => {
1401 Ok(CallbackPollResult::Stale {
1402 token: callback_id,
1403 current: current_callback_id,
1404 state,
1405 })
1406 }
1407 Some((JobState::WaitingExternal, Some(current), _)) if current == callback_id => {
1408 Ok(CallbackPollResult::Pending)
1409 }
1410 Some((state, _, _)) => Ok(CallbackPollResult::UnexpectedState {
1411 token: callback_id,
1412 state,
1413 }),
1414 None => Ok(CallbackPollResult::NotFound),
1415 }
1416}
1417
1418pub async fn take_callback_payload(
1420 pool: &PgPool,
1421 job_id: i64,
1422 metadata: serde_json::Value,
1423) -> Result<serde_json::Value, AwaError> {
1424 let payload = metadata
1425 .get("_awa_callback_result")
1426 .cloned()
1427 .unwrap_or(serde_json::Value::Null);
1428
1429 sqlx::query("UPDATE awa.jobs SET metadata = metadata - '_awa_callback_result' WHERE id = $1")
1430 .bind(job_id)
1431 .execute(pool)
1432 .await?;
1433
1434 Ok(payload)
1435}
1436
1437#[derive(Debug, Clone, Default)]
1444pub struct CallbackConfig {
1445 pub filter: Option<String>,
1447 pub on_complete: Option<String>,
1449 pub on_fail: Option<String>,
1451 pub transform: Option<String>,
1453}
1454
1455impl CallbackConfig {
1456 pub fn is_empty(&self) -> bool {
1458 self.filter.is_none()
1459 && self.on_complete.is_none()
1460 && self.on_fail.is_none()
1461 && self.transform.is_none()
1462 }
1463}
1464
1465#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1468pub enum DefaultAction {
1469 Complete,
1470 Fail,
1471 Ignore,
1472}
1473
1474#[derive(Debug)]
1476pub enum ResolveOutcome {
1477 Completed {
1478 payload: Option<serde_json::Value>,
1479 job: JobRow,
1480 },
1481 Failed {
1482 job: JobRow,
1483 },
1484 Ignored {
1485 reason: String,
1486 },
1487}
1488
1489impl ResolveOutcome {
1490 pub fn is_completed(&self) -> bool {
1491 matches!(self, ResolveOutcome::Completed { .. })
1492 }
1493 pub fn is_failed(&self) -> bool {
1494 matches!(self, ResolveOutcome::Failed { .. })
1495 }
1496 pub fn is_ignored(&self) -> bool {
1497 matches!(self, ResolveOutcome::Ignored { .. })
1498 }
1499}
1500
1501pub async fn register_callback_with_config<'e, E>(
1510 executor: E,
1511 job_id: i64,
1512 run_lease: i64,
1513 timeout: std::time::Duration,
1514 config: &CallbackConfig,
1515) -> Result<Uuid, AwaError>
1516where
1517 E: PgExecutor<'e>,
1518{
1519 #[cfg(feature = "cel")]
1521 {
1522 for (name, expr) in [
1523 ("filter", &config.filter),
1524 ("on_complete", &config.on_complete),
1525 ("on_fail", &config.on_fail),
1526 ("transform", &config.transform),
1527 ] {
1528 if let Some(src) = expr {
1529 let program = cel::Program::compile(src).map_err(|e| {
1530 AwaError::Validation(format!("invalid CEL expression for {name}: {e}"))
1531 })?;
1532
1533 let refs = program.references();
1537 let bad_vars: Vec<&str> = refs
1538 .variables()
1539 .into_iter()
1540 .filter(|v| *v != "payload")
1541 .collect();
1542 if !bad_vars.is_empty() {
1543 return Err(AwaError::Validation(format!(
1544 "CEL expression for {name} references undeclared variable(s): {}; \
1545 only 'payload' is available",
1546 bad_vars.join(", ")
1547 )));
1548 }
1549 }
1550 }
1551 }
1552
1553 #[cfg(not(feature = "cel"))]
1554 {
1555 if !config.is_empty() {
1556 return Err(AwaError::Validation(
1557 "CEL expressions require the 'cel' feature".into(),
1558 ));
1559 }
1560 }
1561
1562 let callback_id = Uuid::new_v4();
1563 let timeout_secs = timeout.as_secs_f64();
1564
1565 let result = sqlx::query(
1566 r#"UPDATE awa.jobs
1567 SET callback_id = $2,
1568 callback_timeout_at = now() + make_interval(secs => $3),
1569 callback_filter = $4,
1570 callback_on_complete = $5,
1571 callback_on_fail = $6,
1572 callback_transform = $7
1573 WHERE id = $1 AND state = 'running' AND run_lease = $8"#,
1574 )
1575 .bind(job_id)
1576 .bind(callback_id)
1577 .bind(timeout_secs)
1578 .bind(&config.filter)
1579 .bind(&config.on_complete)
1580 .bind(&config.on_fail)
1581 .bind(&config.transform)
1582 .bind(run_lease)
1583 .execute(executor)
1584 .await?;
1585
1586 if result.rows_affected() == 0 {
1587 return Err(AwaError::Validation("job is not in running state".into()));
1588 }
1589 Ok(callback_id)
1590}
1591
1592enum ResolveAction {
1594 Complete(Option<serde_json::Value>),
1595 Fail {
1596 error: String,
1597 expression: Option<String>,
1598 },
1599 Ignore(String),
1600}
1601
1602pub async fn resolve_callback(
1608 pool: &PgPool,
1609 callback_id: Uuid,
1610 payload: Option<serde_json::Value>,
1611 default_action: DefaultAction,
1612 run_lease: Option<i64>,
1613) -> Result<ResolveOutcome, AwaError> {
1614 let mut tx = pool.begin().await?;
1615
1616 let job = sqlx::query_as::<_, JobRow>(
1625 "SELECT * FROM awa.jobs_hot WHERE callback_id = $1
1626 AND state IN ('waiting_external', 'running')
1627 AND ($2::bigint IS NULL OR run_lease = $2)
1628 FOR UPDATE",
1629 )
1630 .bind(callback_id)
1631 .bind(run_lease)
1632 .fetch_optional(&mut *tx)
1633 .await?
1634 .ok_or(AwaError::CallbackNotFound {
1635 callback_id: callback_id.to_string(),
1636 })?;
1637
1638 let action = evaluate_or_default(&job, &payload, default_action)?;
1639
1640 match action {
1641 ResolveAction::Complete(transformed_payload) => {
1642 let completed_job = sqlx::query_as::<_, JobRow>(
1643 r#"
1644 UPDATE awa.jobs
1645 SET state = 'completed',
1646 finalized_at = now(),
1647 callback_id = NULL,
1648 callback_timeout_at = NULL,
1649 callback_filter = NULL,
1650 callback_on_complete = NULL,
1651 callback_on_fail = NULL,
1652 callback_transform = NULL,
1653 heartbeat_at = NULL,
1654 deadline_at = NULL,
1655 progress = NULL
1656 WHERE id = $1
1657 RETURNING *
1658 "#,
1659 )
1660 .bind(job.id)
1661 .fetch_one(&mut *tx)
1662 .await?;
1663
1664 tx.commit().await?;
1665 Ok(ResolveOutcome::Completed {
1666 payload: transformed_payload,
1667 job: completed_job,
1668 })
1669 }
1670 ResolveAction::Fail { error, expression } => {
1671 let mut error_json = serde_json::json!({
1672 "error": error,
1673 "attempt": job.attempt,
1674 "at": chrono::Utc::now().to_rfc3339(),
1675 });
1676 if let Some(expr) = expression {
1677 error_json["expression"] = serde_json::Value::String(expr);
1678 }
1679
1680 let failed_job = sqlx::query_as::<_, JobRow>(
1681 r#"
1682 UPDATE awa.jobs
1683 SET state = 'failed',
1684 finalized_at = now(),
1685 callback_id = NULL,
1686 callback_timeout_at = NULL,
1687 callback_filter = NULL,
1688 callback_on_complete = NULL,
1689 callback_on_fail = NULL,
1690 callback_transform = NULL,
1691 heartbeat_at = NULL,
1692 deadline_at = NULL,
1693 errors = errors || $2::jsonb
1694 WHERE id = $1
1695 RETURNING *
1696 "#,
1697 )
1698 .bind(job.id)
1699 .bind(error_json)
1700 .fetch_one(&mut *tx)
1701 .await?;
1702
1703 tx.commit().await?;
1704 Ok(ResolveOutcome::Failed { job: failed_job })
1705 }
1706 ResolveAction::Ignore(reason) => {
1707 Ok(ResolveOutcome::Ignored { reason })
1709 }
1710 }
1711}
1712
1713fn evaluate_or_default(
1715 job: &JobRow,
1716 payload: &Option<serde_json::Value>,
1717 default_action: DefaultAction,
1718) -> Result<ResolveAction, AwaError> {
1719 let has_expressions = job.callback_filter.is_some()
1720 || job.callback_on_complete.is_some()
1721 || job.callback_on_fail.is_some()
1722 || job.callback_transform.is_some();
1723
1724 if !has_expressions {
1725 return Ok(apply_default(default_action, payload));
1726 }
1727
1728 #[cfg(feature = "cel")]
1729 {
1730 Ok(evaluate_cel(job, payload, default_action))
1731 }
1732
1733 #[cfg(not(feature = "cel"))]
1734 {
1735 let _ = (payload, default_action);
1738 Err(AwaError::Validation(
1739 "CEL expressions present but 'cel' feature is not enabled".into(),
1740 ))
1741 }
1742}
1743
1744fn apply_default(
1745 default_action: DefaultAction,
1746 payload: &Option<serde_json::Value>,
1747) -> ResolveAction {
1748 match default_action {
1749 DefaultAction::Complete => ResolveAction::Complete(payload.clone()),
1750 DefaultAction::Fail => ResolveAction::Fail {
1751 error: "callback failed: default action".to_string(),
1752 expression: None,
1753 },
1754 DefaultAction::Ignore => {
1755 ResolveAction::Ignore("no expressions configured, default is ignore".to_string())
1756 }
1757 }
1758}
1759
1760#[cfg(feature = "cel")]
1761fn evaluate_cel(
1762 job: &JobRow,
1763 payload: &Option<serde_json::Value>,
1764 default_action: DefaultAction,
1765) -> ResolveAction {
1766 let payload_value = payload.as_ref().cloned().unwrap_or(serde_json::Value::Null);
1767
1768 if let Some(filter_expr) = &job.callback_filter {
1770 match eval_bool(filter_expr, &payload_value, job.id, "filter") {
1771 Ok(true) => {} Ok(false) => {
1773 return ResolveAction::Ignore("filter expression returned false".to_string());
1774 }
1775 Err(_) => {
1776 }
1778 }
1779 }
1780
1781 if let Some(on_fail_expr) = &job.callback_on_fail {
1783 match eval_bool(on_fail_expr, &payload_value, job.id, "on_fail") {
1784 Ok(true) => {
1785 return ResolveAction::Fail {
1786 error: "callback failed: on_fail expression matched".to_string(),
1787 expression: Some(on_fail_expr.clone()),
1788 };
1789 }
1790 Ok(false) => {} Err(_) => {
1792 }
1794 }
1795 }
1796
1797 if let Some(on_complete_expr) = &job.callback_on_complete {
1799 match eval_bool(on_complete_expr, &payload_value, job.id, "on_complete") {
1800 Ok(true) => {
1801 let transformed = apply_transform(job, &payload_value);
1803 return ResolveAction::Complete(Some(transformed));
1804 }
1805 Ok(false) => {} Err(_) => {
1807 }
1809 }
1810 }
1811
1812 apply_default(default_action, payload)
1814}
1815
1816#[cfg(feature = "cel")]
1817fn eval_bool(
1818 expression: &str,
1819 payload_value: &serde_json::Value,
1820 job_id: i64,
1821 expression_name: &str,
1822) -> Result<bool, ()> {
1823 let program = match cel::Program::compile(expression) {
1824 Ok(p) => p,
1825 Err(e) => {
1826 tracing::warn!(
1827 job_id,
1828 expression_name,
1829 expression,
1830 error = %e,
1831 "CEL compilation error during evaluation"
1832 );
1833 return Err(());
1834 }
1835 };
1836
1837 let mut context = cel::Context::default();
1838 if let Err(e) = context.add_variable("payload", payload_value.clone()) {
1839 tracing::warn!(
1840 job_id,
1841 expression_name,
1842 error = %e,
1843 "Failed to add payload variable to CEL context"
1844 );
1845 return Err(());
1846 }
1847
1848 match program.execute(&context) {
1849 Ok(cel::Value::Bool(b)) => Ok(b),
1850 Ok(other) => {
1851 tracing::warn!(
1852 job_id,
1853 expression_name,
1854 expression,
1855 result_type = ?other.type_of(),
1856 "CEL expression returned non-bool"
1857 );
1858 Err(())
1859 }
1860 Err(e) => {
1861 tracing::warn!(
1862 job_id,
1863 expression_name,
1864 expression,
1865 error = %e,
1866 "CEL execution error"
1867 );
1868 Err(())
1869 }
1870 }
1871}
1872
1873#[cfg(feature = "cel")]
1874fn apply_transform(job: &JobRow, payload_value: &serde_json::Value) -> serde_json::Value {
1875 let transform_expr = match &job.callback_transform {
1876 Some(expr) => expr,
1877 None => return payload_value.clone(),
1878 };
1879
1880 let program = match cel::Program::compile(transform_expr) {
1881 Ok(p) => p,
1882 Err(e) => {
1883 tracing::warn!(
1884 job_id = job.id,
1885 expression = transform_expr,
1886 error = %e,
1887 "CEL transform compilation error, using original payload"
1888 );
1889 return payload_value.clone();
1890 }
1891 };
1892
1893 let mut context = cel::Context::default();
1894 if let Err(e) = context.add_variable("payload", payload_value.clone()) {
1895 tracing::warn!(
1896 job_id = job.id,
1897 error = %e,
1898 "Failed to add payload variable for transform"
1899 );
1900 return payload_value.clone();
1901 }
1902
1903 match program.execute(&context) {
1904 Ok(value) => match value.json() {
1905 Ok(json) => json,
1906 Err(e) => {
1907 tracing::warn!(
1908 job_id = job.id,
1909 expression = transform_expr,
1910 error = %e,
1911 "CEL transform result could not be converted to JSON, using original payload"
1912 );
1913 payload_value.clone()
1914 }
1915 },
1916 Err(e) => {
1917 tracing::warn!(
1918 job_id = job.id,
1919 expression = transform_expr,
1920 error = %e,
1921 "CEL transform execution error, using original payload"
1922 );
1923 payload_value.clone()
1924 }
1925 }
1926}