1use crate::error::AwaError;
2use crate::job::{JobRow, JobState};
3use chrono::{DateTime, Duration, Utc};
4use serde::{Deserialize, Serialize};
5use sqlx::types::Json;
6use sqlx::PgExecutor;
7use sqlx::PgPool;
8use std::cmp::max;
9use std::collections::HashMap;
10use uuid::Uuid;
11
12pub async fn retry<'e, E>(executor: E, job_id: i64) -> Result<Option<JobRow>, AwaError>
14where
15 E: PgExecutor<'e>,
16{
17 sqlx::query_as::<_, JobRow>(
18 r#"
19 UPDATE awa.jobs
20 SET state = 'available', attempt = 0, run_at = now(),
21 finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL,
22 callback_id = NULL, callback_timeout_at = NULL,
23 callback_filter = NULL, callback_on_complete = NULL,
24 callback_on_fail = NULL, callback_transform = NULL
25 WHERE id = $1 AND state IN ('failed', 'cancelled', 'waiting_external')
26 RETURNING *
27 "#,
28 )
29 .bind(job_id)
30 .fetch_optional(executor)
31 .await?
32 .ok_or(AwaError::JobNotFound { id: job_id })
33 .map(Some)
34}
35
36pub async fn cancel<'e, E>(executor: E, job_id: i64) -> Result<Option<JobRow>, AwaError>
38where
39 E: PgExecutor<'e>,
40{
41 sqlx::query_as::<_, JobRow>(
42 r#"
43 UPDATE awa.jobs
44 SET state = 'cancelled', finalized_at = now(),
45 callback_id = NULL, callback_timeout_at = NULL,
46 callback_filter = NULL, callback_on_complete = NULL,
47 callback_on_fail = NULL, callback_transform = NULL
48 WHERE id = $1 AND state NOT IN ('completed', 'failed', 'cancelled')
49 RETURNING *
50 "#,
51 )
52 .bind(job_id)
53 .fetch_optional(executor)
54 .await?
55 .ok_or(AwaError::JobNotFound { id: job_id })
56 .map(Some)
57}
58
59pub async fn cancel_by_unique_key<'e, E>(
95 executor: E,
96 kind: &str,
97 queue: Option<&str>,
98 args: Option<&serde_json::Value>,
99 period_bucket: Option<i64>,
100) -> Result<Option<JobRow>, AwaError>
101where
102 E: PgExecutor<'e>,
103{
104 let unique_key = crate::unique::compute_unique_key(kind, queue, args, period_bucket);
105
106 let row = sqlx::query_as::<_, JobRow>(
110 r#"
111 WITH candidates AS (
112 SELECT id FROM awa.jobs_hot
113 WHERE unique_key = $1 AND state NOT IN ('completed', 'failed', 'cancelled')
114 UNION ALL
115 SELECT id FROM awa.scheduled_jobs
116 WHERE unique_key = $1 AND state NOT IN ('completed', 'failed', 'cancelled')
117 ORDER BY id ASC
118 LIMIT 1
119 )
120 UPDATE awa.jobs
121 SET state = 'cancelled', finalized_at = now(),
122 callback_id = NULL, callback_timeout_at = NULL,
123 callback_filter = NULL, callback_on_complete = NULL,
124 callback_on_fail = NULL, callback_transform = NULL
125 FROM candidates
126 WHERE awa.jobs.id = candidates.id
127 RETURNING awa.jobs.*
128 "#,
129 )
130 .bind(&unique_key)
131 .fetch_optional(executor)
132 .await?;
133
134 Ok(row)
135}
136
137pub async fn retry_failed_by_kind<'e, E>(executor: E, kind: &str) -> Result<Vec<JobRow>, AwaError>
139where
140 E: PgExecutor<'e>,
141{
142 let rows = sqlx::query_as::<_, JobRow>(
143 r#"
144 UPDATE awa.jobs
145 SET state = 'available', attempt = 0, run_at = now(),
146 finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL
147 WHERE kind = $1 AND state = 'failed'
148 RETURNING *
149 "#,
150 )
151 .bind(kind)
152 .fetch_all(executor)
153 .await?;
154
155 Ok(rows)
156}
157
158pub async fn retry_failed_by_queue<'e, E>(executor: E, queue: &str) -> Result<Vec<JobRow>, AwaError>
160where
161 E: PgExecutor<'e>,
162{
163 let rows = sqlx::query_as::<_, JobRow>(
164 r#"
165 UPDATE awa.jobs
166 SET state = 'available', attempt = 0, run_at = now(),
167 finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL
168 WHERE queue = $1 AND state = 'failed'
169 RETURNING *
170 "#,
171 )
172 .bind(queue)
173 .fetch_all(executor)
174 .await?;
175
176 Ok(rows)
177}
178
179pub async fn discard_failed<'e, E>(executor: E, kind: &str) -> Result<u64, AwaError>
181where
182 E: PgExecutor<'e>,
183{
184 let result = sqlx::query("DELETE FROM awa.jobs WHERE kind = $1 AND state = 'failed'")
185 .bind(kind)
186 .execute(executor)
187 .await?;
188
189 Ok(result.rows_affected())
190}
191
192pub async fn pause_queue<'e, E>(
194 executor: E,
195 queue: &str,
196 paused_by: Option<&str>,
197) -> Result<(), AwaError>
198where
199 E: PgExecutor<'e>,
200{
201 sqlx::query(
202 r#"
203 INSERT INTO awa.queue_meta (queue, paused, paused_at, paused_by)
204 VALUES ($1, TRUE, now(), $2)
205 ON CONFLICT (queue) DO UPDATE SET paused = TRUE, paused_at = now(), paused_by = $2
206 "#,
207 )
208 .bind(queue)
209 .bind(paused_by)
210 .execute(executor)
211 .await?;
212
213 Ok(())
214}
215
216pub async fn resume_queue<'e, E>(executor: E, queue: &str) -> Result<(), AwaError>
218where
219 E: PgExecutor<'e>,
220{
221 sqlx::query("UPDATE awa.queue_meta SET paused = FALSE WHERE queue = $1")
222 .bind(queue)
223 .execute(executor)
224 .await?;
225
226 Ok(())
227}
228
229pub async fn drain_queue<'e, E>(executor: E, queue: &str) -> Result<u64, AwaError>
231where
232 E: PgExecutor<'e>,
233{
234 let result = sqlx::query(
235 r#"
236 UPDATE awa.jobs
237 SET state = 'cancelled', finalized_at = now(),
238 callback_id = NULL, callback_timeout_at = NULL,
239 callback_filter = NULL, callback_on_complete = NULL,
240 callback_on_fail = NULL, callback_transform = NULL
241 WHERE queue = $1 AND state IN ('available', 'scheduled', 'retryable', 'waiting_external')
242 "#,
243 )
244 .bind(queue)
245 .execute(executor)
246 .await?;
247
248 Ok(result.rows_affected())
249}
250
251#[derive(Debug, Clone, Serialize)]
253pub struct QueueStats {
254 pub queue: String,
255 pub total_queued: i64,
257 pub scheduled: i64,
258 pub available: i64,
259 pub retryable: i64,
260 pub running: i64,
261 pub failed: i64,
262 pub waiting_external: i64,
263 pub completed_last_hour: i64,
264 pub lag_seconds: Option<f64>,
265 pub paused: bool,
266}
267
268#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
270pub struct RateLimitSnapshot {
271 pub max_rate: f64,
272 pub burst: u32,
273}
274
275#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
277#[serde(rename_all = "snake_case")]
278pub enum QueueRuntimeMode {
279 HardReserved,
280 Weighted,
281}
282
283#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
285pub struct QueueRuntimeConfigSnapshot {
286 pub mode: QueueRuntimeMode,
287 pub max_workers: Option<u32>,
288 pub min_workers: Option<u32>,
289 pub weight: Option<u32>,
290 pub global_max_workers: Option<u32>,
291 pub poll_interval_ms: u64,
292 pub deadline_duration_secs: u64,
293 pub priority_aging_interval_secs: u64,
294 pub rate_limit: Option<RateLimitSnapshot>,
295}
296
297#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
299pub struct QueueRuntimeSnapshot {
300 pub queue: String,
301 pub in_flight: u32,
302 pub overflow_held: Option<u32>,
303 pub config: QueueRuntimeConfigSnapshot,
304}
305
306#[derive(Debug, Clone, Serialize, Deserialize)]
308pub struct RuntimeSnapshotInput {
309 pub instance_id: Uuid,
310 pub hostname: Option<String>,
311 pub pid: i32,
312 pub version: String,
313 pub started_at: DateTime<Utc>,
314 pub snapshot_interval_ms: i64,
315 pub healthy: bool,
316 pub postgres_connected: bool,
317 pub poll_loop_alive: bool,
318 pub heartbeat_alive: bool,
319 pub maintenance_alive: bool,
320 pub shutting_down: bool,
321 pub leader: bool,
322 pub global_max_workers: Option<u32>,
323 pub queues: Vec<QueueRuntimeSnapshot>,
324}
325
326#[derive(Debug, Clone, Serialize, Deserialize)]
328pub struct RuntimeInstance {
329 pub instance_id: Uuid,
330 pub hostname: Option<String>,
331 pub pid: i32,
332 pub version: String,
333 pub started_at: DateTime<Utc>,
334 pub last_seen_at: DateTime<Utc>,
335 pub snapshot_interval_ms: i64,
336 pub stale: bool,
337 pub healthy: bool,
338 pub postgres_connected: bool,
339 pub poll_loop_alive: bool,
340 pub heartbeat_alive: bool,
341 pub maintenance_alive: bool,
342 pub shutting_down: bool,
343 pub leader: bool,
344 pub global_max_workers: Option<u32>,
345 pub queues: Vec<QueueRuntimeSnapshot>,
346}
347
348impl RuntimeInstance {
349 fn stale_cutoff(interval_ms: i64) -> Duration {
350 let interval_ms = max(interval_ms, 1_000);
351 Duration::milliseconds(max(interval_ms.saturating_mul(3), 30_000))
352 }
353
354 fn from_db_row(row: RuntimeInstanceRow, now: DateTime<Utc>) -> Self {
355 let stale = row.last_seen_at + Self::stale_cutoff(row.snapshot_interval_ms) < now;
356 Self {
357 instance_id: row.instance_id,
358 hostname: row.hostname,
359 pid: row.pid,
360 version: row.version,
361 started_at: row.started_at,
362 last_seen_at: row.last_seen_at,
363 snapshot_interval_ms: row.snapshot_interval_ms,
364 stale,
365 healthy: row.healthy,
366 postgres_connected: row.postgres_connected,
367 poll_loop_alive: row.poll_loop_alive,
368 heartbeat_alive: row.heartbeat_alive,
369 maintenance_alive: row.maintenance_alive,
370 shutting_down: row.shutting_down,
371 leader: row.leader,
372 global_max_workers: row.global_max_workers.map(|v| v as u32),
373 queues: row.queues.0,
374 }
375 }
376}
377
378#[derive(Debug, Clone, Serialize, Deserialize)]
380pub struct RuntimeOverview {
381 pub total_instances: usize,
382 pub live_instances: usize,
383 pub stale_instances: usize,
384 pub healthy_instances: usize,
385 pub leader_instances: usize,
386 pub instances: Vec<RuntimeInstance>,
387}
388
389#[derive(Debug, Clone, Serialize, Deserialize)]
391pub struct QueueRuntimeSummary {
392 pub queue: String,
393 pub instance_count: usize,
394 pub live_instances: usize,
395 pub stale_instances: usize,
396 pub healthy_instances: usize,
397 pub total_in_flight: u64,
398 pub overflow_held_total: Option<u64>,
399 pub config_mismatch: bool,
400 pub config: Option<QueueRuntimeConfigSnapshot>,
401}
402
403#[derive(Debug, sqlx::FromRow)]
404struct RuntimeInstanceRow {
405 instance_id: Uuid,
406 hostname: Option<String>,
407 pid: i32,
408 version: String,
409 started_at: DateTime<Utc>,
410 last_seen_at: DateTime<Utc>,
411 snapshot_interval_ms: i64,
412 healthy: bool,
413 postgres_connected: bool,
414 poll_loop_alive: bool,
415 heartbeat_alive: bool,
416 maintenance_alive: bool,
417 shutting_down: bool,
418 leader: bool,
419 global_max_workers: Option<i32>,
420 queues: Json<Vec<QueueRuntimeSnapshot>>,
421}
422
423pub async fn upsert_runtime_snapshot<'e, E>(
425 executor: E,
426 snapshot: &RuntimeSnapshotInput,
427) -> Result<(), AwaError>
428where
429 E: PgExecutor<'e>,
430{
431 sqlx::query(
432 r#"
433 INSERT INTO awa.runtime_instances (
434 instance_id,
435 hostname,
436 pid,
437 version,
438 started_at,
439 last_seen_at,
440 snapshot_interval_ms,
441 healthy,
442 postgres_connected,
443 poll_loop_alive,
444 heartbeat_alive,
445 maintenance_alive,
446 shutting_down,
447 leader,
448 global_max_workers,
449 queues
450 )
451 VALUES (
452 $1, $2, $3, $4, $5, now(), $6, $7, $8, $9, $10, $11, $12, $13, $14, $15
453 )
454 ON CONFLICT (instance_id) DO UPDATE SET
455 hostname = EXCLUDED.hostname,
456 pid = EXCLUDED.pid,
457 version = EXCLUDED.version,
458 started_at = EXCLUDED.started_at,
459 last_seen_at = now(),
460 snapshot_interval_ms = EXCLUDED.snapshot_interval_ms,
461 healthy = EXCLUDED.healthy,
462 postgres_connected = EXCLUDED.postgres_connected,
463 poll_loop_alive = EXCLUDED.poll_loop_alive,
464 heartbeat_alive = EXCLUDED.heartbeat_alive,
465 maintenance_alive = EXCLUDED.maintenance_alive,
466 shutting_down = EXCLUDED.shutting_down,
467 leader = EXCLUDED.leader,
468 global_max_workers = EXCLUDED.global_max_workers,
469 queues = EXCLUDED.queues
470 "#,
471 )
472 .bind(snapshot.instance_id)
473 .bind(snapshot.hostname.as_deref())
474 .bind(snapshot.pid)
475 .bind(&snapshot.version)
476 .bind(snapshot.started_at)
477 .bind(snapshot.snapshot_interval_ms)
478 .bind(snapshot.healthy)
479 .bind(snapshot.postgres_connected)
480 .bind(snapshot.poll_loop_alive)
481 .bind(snapshot.heartbeat_alive)
482 .bind(snapshot.maintenance_alive)
483 .bind(snapshot.shutting_down)
484 .bind(snapshot.leader)
485 .bind(snapshot.global_max_workers.map(|v| v as i32))
486 .bind(Json(&snapshot.queues))
487 .execute(executor)
488 .await?;
489
490 Ok(())
491}
492
493pub async fn cleanup_runtime_snapshots<'e, E>(
495 executor: E,
496 max_age: Duration,
497) -> Result<u64, AwaError>
498where
499 E: PgExecutor<'e>,
500{
501 let seconds = max(max_age.num_seconds(), 1);
502 let result = sqlx::query(
503 "DELETE FROM awa.runtime_instances WHERE last_seen_at < now() - make_interval(secs => $1)",
504 )
505 .bind(seconds)
506 .execute(executor)
507 .await?;
508
509 Ok(result.rows_affected())
510}
511
512pub async fn list_runtime_instances<'e, E>(executor: E) -> Result<Vec<RuntimeInstance>, AwaError>
514where
515 E: PgExecutor<'e>,
516{
517 let rows = sqlx::query_as::<_, RuntimeInstanceRow>(
518 r#"
519 SELECT
520 instance_id,
521 hostname,
522 pid,
523 version,
524 started_at,
525 last_seen_at,
526 snapshot_interval_ms,
527 healthy,
528 postgres_connected,
529 poll_loop_alive,
530 heartbeat_alive,
531 maintenance_alive,
532 shutting_down,
533 leader,
534 global_max_workers,
535 queues
536 FROM awa.runtime_instances
537 ORDER BY leader DESC, last_seen_at DESC, started_at DESC
538 "#,
539 )
540 .fetch_all(executor)
541 .await?;
542
543 let now = Utc::now();
544 Ok(rows
545 .into_iter()
546 .map(|row| RuntimeInstance::from_db_row(row, now))
547 .collect())
548}
549
550pub async fn runtime_overview<'e, E>(executor: E) -> Result<RuntimeOverview, AwaError>
552where
553 E: PgExecutor<'e>,
554{
555 let instances = list_runtime_instances(executor).await?;
556 let total_instances = instances.len();
557 let stale_instances = instances.iter().filter(|i| i.stale).count();
558 let live_instances = total_instances.saturating_sub(stale_instances);
559 let healthy_instances = instances.iter().filter(|i| !i.stale && i.healthy).count();
560 let leader_instances = instances.iter().filter(|i| !i.stale && i.leader).count();
561
562 Ok(RuntimeOverview {
563 total_instances,
564 live_instances,
565 stale_instances,
566 healthy_instances,
567 leader_instances,
568 instances,
569 })
570}
571
572pub async fn queue_runtime_summary<'e, E>(executor: E) -> Result<Vec<QueueRuntimeSummary>, AwaError>
574where
575 E: PgExecutor<'e>,
576{
577 let instances = list_runtime_instances(executor).await?;
578 let mut by_queue: HashMap<String, Vec<(bool, bool, QueueRuntimeSnapshot)>> = HashMap::new();
579
580 for instance in instances {
581 let is_live = !instance.stale;
582 let is_healthy = is_live && instance.healthy;
583 for queue in instance.queues {
584 by_queue
585 .entry(queue.queue.clone())
586 .or_default()
587 .push((is_live, is_healthy, queue));
588 }
589 }
590
591 let mut summaries: Vec<_> = by_queue
592 .into_iter()
593 .map(|(queue, entries)| {
594 let instance_count = entries.len();
595 let live_instances = entries.iter().filter(|(live, _, _)| *live).count();
596 let stale_instances = instance_count.saturating_sub(live_instances);
597 let healthy_instances = entries.iter().filter(|(_, healthy, _)| *healthy).count();
598 let total_in_flight = entries
599 .iter()
600 .filter(|(live, _, _)| *live)
601 .map(|(_, _, queue)| u64::from(queue.in_flight))
602 .sum();
603
604 let overflow_total: u64 = entries
605 .iter()
606 .filter(|(live, _, _)| *live)
607 .filter_map(|(_, _, queue)| queue.overflow_held.map(u64::from))
608 .sum();
609
610 let live_configs: Vec<_> = entries
611 .iter()
612 .filter(|(live, _, _)| *live)
613 .map(|(_, _, queue)| queue.config.clone())
614 .collect();
615 let config_candidates = if live_configs.is_empty() {
616 entries
617 .iter()
618 .map(|(_, _, queue)| queue.config.clone())
619 .collect::<Vec<_>>()
620 } else {
621 live_configs
622 };
623 let config = config_candidates.first().cloned();
624 let config_mismatch = config_candidates
625 .iter()
626 .skip(1)
627 .any(|candidate| Some(candidate) != config.as_ref());
628
629 QueueRuntimeSummary {
630 queue,
631 instance_count,
632 live_instances,
633 stale_instances,
634 healthy_instances,
635 total_in_flight,
636 overflow_held_total: config
637 .as_ref()
638 .filter(|cfg| cfg.mode == QueueRuntimeMode::Weighted)
639 .map(|_| overflow_total),
640 config_mismatch,
641 config,
642 }
643 })
644 .collect();
645
646 summaries.sort_by(|a, b| a.queue.cmp(&b.queue));
647 Ok(summaries)
648}
649
650pub async fn queue_stats<'e, E>(executor: E) -> Result<Vec<QueueStats>, AwaError>
652where
653 E: PgExecutor<'e>,
654{
655 let rows = sqlx::query_as::<
656 _,
657 (
658 String,
659 i64,
660 i64,
661 i64,
662 i64,
663 i64,
664 i64,
665 i64,
666 i64,
667 Option<f64>,
668 bool,
669 ),
670 >(
671 r#"
672 WITH available_lag AS (
673 SELECT
674 queue,
675 EXTRACT(EPOCH FROM (now() - min(run_at)))::float8 AS lag_seconds
676 FROM awa.jobs_hot
677 WHERE state = 'available'
678 GROUP BY queue
679 ),
680 completed_recent AS (
681 SELECT
682 queue,
683 count(*)::bigint AS completed_last_hour
684 FROM awa.jobs_hot
685 WHERE state = 'completed'
686 AND finalized_at > now() - interval '1 hour'
687 GROUP BY queue
688 )
689 SELECT
690 qs.queue,
691 qs.scheduled + qs.available + qs.running + qs.retryable + qs.waiting_external AS total_queued,
692 qs.scheduled,
693 qs.available,
694 qs.retryable,
695 qs.running,
696 qs.failed,
697 qs.waiting_external,
698 COALESCE(cr.completed_last_hour, 0) AS completed_last_hour,
699 al.lag_seconds,
700 COALESCE(qm.paused, FALSE) AS paused
701 FROM awa.queue_state_counts qs
702 LEFT JOIN available_lag al ON al.queue = qs.queue
703 LEFT JOIN completed_recent cr ON cr.queue = qs.queue
704 LEFT JOIN awa.queue_meta qm ON qm.queue = qs.queue
705 ORDER BY qs.queue
706 "#,
707 )
708 .fetch_all(executor)
709 .await?;
710
711 Ok(rows
712 .into_iter()
713 .map(
714 |(
715 queue,
716 total_queued,
717 scheduled,
718 available,
719 retryable,
720 running,
721 failed,
722 waiting_external,
723 completed_last_hour,
724 lag_seconds,
725 paused,
726 )| QueueStats {
727 queue,
728 total_queued,
729 scheduled,
730 available,
731 retryable,
732 running,
733 failed,
734 waiting_external,
735 completed_last_hour,
736 lag_seconds,
737 paused,
738 },
739 )
740 .collect())
741}
742
743#[derive(Debug, Clone, Default, Serialize)]
745pub struct ListJobsFilter {
746 pub state: Option<JobState>,
747 pub kind: Option<String>,
748 pub queue: Option<String>,
749 pub tag: Option<String>,
750 pub before_id: Option<i64>,
751 pub limit: Option<i64>,
752}
753
754pub async fn list_jobs<'e, E>(executor: E, filter: &ListJobsFilter) -> Result<Vec<JobRow>, AwaError>
756where
757 E: PgExecutor<'e>,
758{
759 let limit = filter.limit.unwrap_or(100);
760
761 let rows = sqlx::query_as::<_, JobRow>(
762 r#"
763 SELECT * FROM awa.jobs
764 WHERE ($1::awa.job_state IS NULL OR state = $1)
765 AND ($2::text IS NULL OR kind = $2)
766 AND ($3::text IS NULL OR queue = $3)
767 AND ($4::text IS NULL OR tags @> ARRAY[$4]::text[])
768 AND ($5::bigint IS NULL OR id < $5)
769 ORDER BY id DESC
770 LIMIT $6
771 "#,
772 )
773 .bind(filter.state)
774 .bind(&filter.kind)
775 .bind(&filter.queue)
776 .bind(&filter.tag)
777 .bind(filter.before_id)
778 .bind(limit)
779 .fetch_all(executor)
780 .await?;
781
782 Ok(rows)
783}
784
785pub async fn get_job<'e, E>(executor: E, job_id: i64) -> Result<JobRow, AwaError>
787where
788 E: PgExecutor<'e>,
789{
790 let row = sqlx::query_as::<_, JobRow>("SELECT * FROM awa.jobs WHERE id = $1")
791 .bind(job_id)
792 .fetch_optional(executor)
793 .await?;
794
795 row.ok_or(AwaError::JobNotFound { id: job_id })
796}
797
798pub async fn state_counts<'e, E>(executor: E) -> Result<HashMap<JobState, i64>, AwaError>
800where
801 E: PgExecutor<'e>,
802{
803 let rows = sqlx::query_as::<_, (JobState, i64)>(
806 r#"
807 SELECT v.state, v.total FROM (
808 SELECT
809 COALESCE(sum(scheduled), 0)::bigint AS scheduled,
810 COALESCE(sum(available), 0)::bigint AS available,
811 COALESCE(sum(running), 0)::bigint AS running,
812 COALESCE(sum(completed), 0)::bigint AS completed,
813 COALESCE(sum(retryable), 0)::bigint AS retryable,
814 COALESCE(sum(failed), 0)::bigint AS failed,
815 COALESCE(sum(cancelled), 0)::bigint AS cancelled,
816 COALESCE(sum(waiting_external), 0)::bigint AS waiting_external
817 FROM awa.queue_state_counts
818 ) s,
819 LATERAL (VALUES
820 ('scheduled'::awa.job_state, s.scheduled),
821 ('available'::awa.job_state, s.available),
822 ('running'::awa.job_state, s.running),
823 ('completed'::awa.job_state, s.completed),
824 ('retryable'::awa.job_state, s.retryable),
825 ('failed'::awa.job_state, s.failed),
826 ('cancelled'::awa.job_state, s.cancelled),
827 ('waiting_external'::awa.job_state, s.waiting_external)
828 ) AS v(state, total)
829 "#,
830 )
831 .fetch_all(executor)
832 .await?;
833
834 Ok(rows.into_iter().collect())
835}
836
837pub async fn distinct_kinds<'e, E>(executor: E) -> Result<Vec<String>, AwaError>
839where
840 E: PgExecutor<'e>,
841{
842 let rows = sqlx::query_scalar::<_, String>(
843 "SELECT kind FROM awa.job_kind_catalog WHERE ref_count > 0 ORDER BY kind",
844 )
845 .fetch_all(executor)
846 .await?;
847
848 Ok(rows)
849}
850
851pub async fn distinct_queues<'e, E>(executor: E) -> Result<Vec<String>, AwaError>
853where
854 E: PgExecutor<'e>,
855{
856 let rows = sqlx::query_scalar::<_, String>(
857 "SELECT queue FROM awa.job_queue_catalog WHERE ref_count > 0 ORDER BY queue",
858 )
859 .fetch_all(executor)
860 .await?;
861
862 Ok(rows)
863}
864
865pub async fn bulk_retry<'e, E>(executor: E, ids: &[i64]) -> Result<Vec<JobRow>, AwaError>
867where
868 E: PgExecutor<'e>,
869{
870 let rows = sqlx::query_as::<_, JobRow>(
871 r#"
872 UPDATE awa.jobs
873 SET state = 'available', attempt = 0, run_at = now(),
874 finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL,
875 callback_id = NULL, callback_timeout_at = NULL,
876 callback_filter = NULL, callback_on_complete = NULL,
877 callback_on_fail = NULL, callback_transform = NULL
878 WHERE id = ANY($1) AND state IN ('failed', 'cancelled', 'waiting_external')
879 RETURNING *
880 "#,
881 )
882 .bind(ids)
883 .fetch_all(executor)
884 .await?;
885
886 Ok(rows)
887}
888
889pub async fn bulk_cancel<'e, E>(executor: E, ids: &[i64]) -> Result<Vec<JobRow>, AwaError>
891where
892 E: PgExecutor<'e>,
893{
894 let rows = sqlx::query_as::<_, JobRow>(
895 r#"
896 UPDATE awa.jobs
897 SET state = 'cancelled', finalized_at = now(),
898 callback_id = NULL, callback_timeout_at = NULL,
899 callback_filter = NULL, callback_on_complete = NULL,
900 callback_on_fail = NULL, callback_transform = NULL
901 WHERE id = ANY($1) AND state NOT IN ('completed', 'failed', 'cancelled')
902 RETURNING *
903 "#,
904 )
905 .bind(ids)
906 .fetch_all(executor)
907 .await?;
908
909 Ok(rows)
910}
911
912#[derive(Debug, Clone, Serialize)]
914pub struct StateTimeseriesBucket {
915 pub bucket: chrono::DateTime<chrono::Utc>,
916 pub state: JobState,
917 pub count: i64,
918}
919
920pub async fn state_timeseries<'e, E>(
922 executor: E,
923 minutes: i32,
924) -> Result<Vec<StateTimeseriesBucket>, AwaError>
925where
926 E: PgExecutor<'e>,
927{
928 let rows = sqlx::query_as::<_, (chrono::DateTime<chrono::Utc>, JobState, i64)>(
929 r#"
930 SELECT
931 date_trunc('minute', created_at) AS bucket,
932 state,
933 count(*) AS count
934 FROM awa.jobs
935 WHERE created_at >= now() - make_interval(mins => $1)
936 GROUP BY bucket, state
937 ORDER BY bucket
938 "#,
939 )
940 .bind(minutes)
941 .fetch_all(executor)
942 .await?;
943
944 Ok(rows
945 .into_iter()
946 .map(|(bucket, state, count)| StateTimeseriesBucket {
947 bucket,
948 state,
949 count,
950 })
951 .collect())
952}
953
954pub async fn register_callback<'e, E>(
963 executor: E,
964 job_id: i64,
965 run_lease: i64,
966 timeout: std::time::Duration,
967) -> Result<Uuid, AwaError>
968where
969 E: PgExecutor<'e>,
970{
971 let callback_id = Uuid::new_v4();
972 let timeout_secs = timeout.as_secs_f64();
973 let result = sqlx::query(
974 r#"UPDATE awa.jobs
975 SET callback_id = $2,
976 callback_timeout_at = now() + make_interval(secs => $3),
977 callback_filter = NULL,
978 callback_on_complete = NULL,
979 callback_on_fail = NULL,
980 callback_transform = NULL
981 WHERE id = $1 AND state = 'running' AND run_lease = $4"#,
982 )
983 .bind(job_id)
984 .bind(callback_id)
985 .bind(timeout_secs)
986 .bind(run_lease)
987 .execute(executor)
988 .await?;
989 if result.rows_affected() == 0 {
990 return Err(AwaError::Validation("job is not in running state".into()));
991 }
992 Ok(callback_id)
993}
994
995pub async fn complete_external<'e, E>(
1006 executor: E,
1007 callback_id: Uuid,
1008 payload: Option<serde_json::Value>,
1009 run_lease: Option<i64>,
1010) -> Result<JobRow, AwaError>
1011where
1012 E: PgExecutor<'e>,
1013{
1014 complete_external_inner(executor, callback_id, payload, run_lease, false).await
1015}
1016
1017pub async fn resume_external<'e, E>(
1023 executor: E,
1024 callback_id: Uuid,
1025 payload: Option<serde_json::Value>,
1026 run_lease: Option<i64>,
1027) -> Result<JobRow, AwaError>
1028where
1029 E: PgExecutor<'e>,
1030{
1031 complete_external_inner(executor, callback_id, payload, run_lease, true).await
1032}
1033
1034async fn complete_external_inner<'e, E>(
1035 executor: E,
1036 callback_id: Uuid,
1037 payload: Option<serde_json::Value>,
1038 run_lease: Option<i64>,
1039 resume: bool,
1040) -> Result<JobRow, AwaError>
1041where
1042 E: PgExecutor<'e>,
1043{
1044 let row = if resume {
1045 let payload_json = payload.unwrap_or(serde_json::Value::Null);
1048 sqlx::query_as::<_, JobRow>(
1049 r#"
1050 UPDATE awa.jobs
1051 SET state = 'running',
1052 callback_id = NULL,
1053 callback_timeout_at = NULL,
1054 callback_filter = NULL,
1055 callback_on_complete = NULL,
1056 callback_on_fail = NULL,
1057 callback_transform = NULL,
1058 heartbeat_at = now(),
1059 metadata = metadata || jsonb_build_object('_awa_callback_result', $3::jsonb)
1060 WHERE callback_id = $1 AND state IN ('waiting_external', 'running')
1061 AND ($2::bigint IS NULL OR run_lease = $2)
1062 RETURNING *
1063 "#,
1064 )
1065 .bind(callback_id)
1066 .bind(run_lease)
1067 .bind(&payload_json)
1068 .fetch_optional(executor)
1069 .await?
1070 } else {
1071 sqlx::query_as::<_, JobRow>(
1073 r#"
1074 UPDATE awa.jobs
1075 SET state = 'completed',
1076 finalized_at = now(),
1077 callback_id = NULL,
1078 callback_timeout_at = NULL,
1079 callback_filter = NULL,
1080 callback_on_complete = NULL,
1081 callback_on_fail = NULL,
1082 callback_transform = NULL,
1083 heartbeat_at = NULL,
1084 deadline_at = NULL,
1085 progress = NULL
1086 WHERE callback_id = $1 AND state IN ('waiting_external', 'running')
1087 AND ($2::bigint IS NULL OR run_lease = $2)
1088 RETURNING *
1089 "#,
1090 )
1091 .bind(callback_id)
1092 .bind(run_lease)
1093 .fetch_optional(executor)
1094 .await?
1095 };
1096
1097 row.ok_or(AwaError::CallbackNotFound {
1098 callback_id: callback_id.to_string(),
1099 })
1100}
1101
1102pub async fn fail_external<'e, E>(
1106 executor: E,
1107 callback_id: Uuid,
1108 error: &str,
1109 run_lease: Option<i64>,
1110) -> Result<JobRow, AwaError>
1111where
1112 E: PgExecutor<'e>,
1113{
1114 let row = sqlx::query_as::<_, JobRow>(
1115 r#"
1116 UPDATE awa.jobs
1117 SET state = 'failed',
1118 finalized_at = now(),
1119 callback_id = NULL,
1120 callback_timeout_at = NULL,
1121 callback_filter = NULL,
1122 callback_on_complete = NULL,
1123 callback_on_fail = NULL,
1124 callback_transform = NULL,
1125 heartbeat_at = NULL,
1126 deadline_at = NULL,
1127 errors = errors || jsonb_build_object(
1128 'error', $2::text,
1129 'attempt', attempt,
1130 'at', now()
1131 )::jsonb
1132 WHERE callback_id = $1 AND state IN ('waiting_external', 'running')
1133 AND ($3::bigint IS NULL OR run_lease = $3)
1134 RETURNING *
1135 "#,
1136 )
1137 .bind(callback_id)
1138 .bind(error)
1139 .bind(run_lease)
1140 .fetch_optional(executor)
1141 .await?;
1142
1143 row.ok_or(AwaError::CallbackNotFound {
1144 callback_id: callback_id.to_string(),
1145 })
1146}
1147
1148pub async fn retry_external<'e, E>(
1158 executor: E,
1159 callback_id: Uuid,
1160 run_lease: Option<i64>,
1161) -> Result<JobRow, AwaError>
1162where
1163 E: PgExecutor<'e>,
1164{
1165 let row = sqlx::query_as::<_, JobRow>(
1166 r#"
1167 UPDATE awa.jobs
1168 SET state = 'available',
1169 attempt = 0,
1170 run_at = now(),
1171 finalized_at = NULL,
1172 callback_id = NULL,
1173 callback_timeout_at = NULL,
1174 callback_filter = NULL,
1175 callback_on_complete = NULL,
1176 callback_on_fail = NULL,
1177 callback_transform = NULL,
1178 heartbeat_at = NULL,
1179 deadline_at = NULL
1180 WHERE callback_id = $1 AND state = 'waiting_external'
1181 AND ($2::bigint IS NULL OR run_lease = $2)
1182 RETURNING *
1183 "#,
1184 )
1185 .bind(callback_id)
1186 .bind(run_lease)
1187 .fetch_optional(executor)
1188 .await?;
1189
1190 row.ok_or(AwaError::CallbackNotFound {
1191 callback_id: callback_id.to_string(),
1192 })
1193}
1194
1195pub async fn heartbeat_callback<'e, E>(
1204 executor: E,
1205 callback_id: Uuid,
1206 timeout: std::time::Duration,
1207) -> Result<JobRow, AwaError>
1208where
1209 E: PgExecutor<'e>,
1210{
1211 let timeout_secs = timeout.as_secs_f64();
1212 let row = sqlx::query_as::<_, JobRow>(
1213 r#"
1214 UPDATE awa.jobs
1215 SET callback_timeout_at = now() + make_interval(secs => $2)
1216 WHERE callback_id = $1 AND state = 'waiting_external'
1217 RETURNING *
1218 "#,
1219 )
1220 .bind(callback_id)
1221 .bind(timeout_secs)
1222 .fetch_optional(executor)
1223 .await?;
1224
1225 row.ok_or(AwaError::CallbackNotFound {
1226 callback_id: callback_id.to_string(),
1227 })
1228}
1229
1230pub async fn cancel_callback<'e, E>(
1236 executor: E,
1237 job_id: i64,
1238 run_lease: i64,
1239) -> Result<bool, AwaError>
1240where
1241 E: PgExecutor<'e>,
1242{
1243 let result = sqlx::query(
1244 r#"
1245 UPDATE awa.jobs
1246 SET callback_id = NULL,
1247 callback_timeout_at = NULL,
1248 callback_filter = NULL,
1249 callback_on_complete = NULL,
1250 callback_on_fail = NULL,
1251 callback_transform = NULL
1252 WHERE id = $1 AND callback_id IS NOT NULL AND state = 'running' AND run_lease = $2
1253 "#,
1254 )
1255 .bind(job_id)
1256 .bind(run_lease)
1257 .execute(executor)
1258 .await?;
1259
1260 Ok(result.rows_affected() > 0)
1261}
1262
1263#[derive(Debug)]
1271pub enum CallbackPollResult {
1272 Resolved(serde_json::Value),
1274 Pending,
1276 Stale {
1278 token: Uuid,
1279 current: Uuid,
1280 state: JobState,
1281 },
1282 UnexpectedState { token: Uuid, state: JobState },
1284 NotFound,
1286}
1287
1288pub async fn enter_callback_wait(
1293 pool: &PgPool,
1294 job_id: i64,
1295 run_lease: i64,
1296 callback_id: Uuid,
1297) -> Result<bool, AwaError> {
1298 let result = sqlx::query(
1299 r#"
1300 UPDATE awa.jobs
1301 SET state = 'waiting_external',
1302 heartbeat_at = NULL,
1303 deadline_at = NULL
1304 WHERE id = $1 AND state = 'running' AND run_lease = $2 AND callback_id = $3
1305 "#,
1306 )
1307 .bind(job_id)
1308 .bind(run_lease)
1309 .bind(callback_id)
1310 .execute(pool)
1311 .await?;
1312
1313 Ok(result.rows_affected() > 0)
1314}
1315
1316pub async fn check_callback_state(
1322 pool: &PgPool,
1323 job_id: i64,
1324 callback_id: Uuid,
1325) -> Result<CallbackPollResult, AwaError> {
1326 let row: Option<(JobState, Option<Uuid>, serde_json::Value)> =
1327 sqlx::query_as("SELECT state, callback_id, metadata FROM awa.jobs WHERE id = $1")
1328 .bind(job_id)
1329 .fetch_optional(pool)
1330 .await?;
1331
1332 match row {
1333 Some((JobState::Running, None, metadata))
1334 if metadata.get("_awa_callback_result").is_some() =>
1335 {
1336 let payload = take_callback_payload(pool, job_id, metadata).await?;
1337 Ok(CallbackPollResult::Resolved(payload))
1338 }
1339 Some((state, Some(current_callback_id), _)) if current_callback_id != callback_id => {
1340 Ok(CallbackPollResult::Stale {
1341 token: callback_id,
1342 current: current_callback_id,
1343 state,
1344 })
1345 }
1346 Some((JobState::WaitingExternal, Some(current), _)) if current == callback_id => {
1347 Ok(CallbackPollResult::Pending)
1348 }
1349 Some((state, _, _)) => Ok(CallbackPollResult::UnexpectedState {
1350 token: callback_id,
1351 state,
1352 }),
1353 None => Ok(CallbackPollResult::NotFound),
1354 }
1355}
1356
1357pub async fn take_callback_payload(
1359 pool: &PgPool,
1360 job_id: i64,
1361 metadata: serde_json::Value,
1362) -> Result<serde_json::Value, AwaError> {
1363 let payload = metadata
1364 .get("_awa_callback_result")
1365 .cloned()
1366 .unwrap_or(serde_json::Value::Null);
1367
1368 sqlx::query("UPDATE awa.jobs SET metadata = metadata - '_awa_callback_result' WHERE id = $1")
1369 .bind(job_id)
1370 .execute(pool)
1371 .await?;
1372
1373 Ok(payload)
1374}
1375
1376#[derive(Debug, Clone, Default)]
1383pub struct CallbackConfig {
1384 pub filter: Option<String>,
1386 pub on_complete: Option<String>,
1388 pub on_fail: Option<String>,
1390 pub transform: Option<String>,
1392}
1393
1394impl CallbackConfig {
1395 pub fn is_empty(&self) -> bool {
1397 self.filter.is_none()
1398 && self.on_complete.is_none()
1399 && self.on_fail.is_none()
1400 && self.transform.is_none()
1401 }
1402}
1403
1404#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1407pub enum DefaultAction {
1408 Complete,
1409 Fail,
1410 Ignore,
1411}
1412
1413#[derive(Debug)]
1415pub enum ResolveOutcome {
1416 Completed {
1417 payload: Option<serde_json::Value>,
1418 job: JobRow,
1419 },
1420 Failed {
1421 job: JobRow,
1422 },
1423 Ignored {
1424 reason: String,
1425 },
1426}
1427
1428impl ResolveOutcome {
1429 pub fn is_completed(&self) -> bool {
1430 matches!(self, ResolveOutcome::Completed { .. })
1431 }
1432 pub fn is_failed(&self) -> bool {
1433 matches!(self, ResolveOutcome::Failed { .. })
1434 }
1435 pub fn is_ignored(&self) -> bool {
1436 matches!(self, ResolveOutcome::Ignored { .. })
1437 }
1438}
1439
1440pub async fn register_callback_with_config<'e, E>(
1449 executor: E,
1450 job_id: i64,
1451 run_lease: i64,
1452 timeout: std::time::Duration,
1453 config: &CallbackConfig,
1454) -> Result<Uuid, AwaError>
1455where
1456 E: PgExecutor<'e>,
1457{
1458 #[cfg(feature = "cel")]
1460 {
1461 for (name, expr) in [
1462 ("filter", &config.filter),
1463 ("on_complete", &config.on_complete),
1464 ("on_fail", &config.on_fail),
1465 ("transform", &config.transform),
1466 ] {
1467 if let Some(src) = expr {
1468 let program = cel::Program::compile(src).map_err(|e| {
1469 AwaError::Validation(format!("invalid CEL expression for {name}: {e}"))
1470 })?;
1471
1472 let refs = program.references();
1476 let bad_vars: Vec<&str> = refs
1477 .variables()
1478 .into_iter()
1479 .filter(|v| *v != "payload")
1480 .collect();
1481 if !bad_vars.is_empty() {
1482 return Err(AwaError::Validation(format!(
1483 "CEL expression for {name} references undeclared variable(s): {}; \
1484 only 'payload' is available",
1485 bad_vars.join(", ")
1486 )));
1487 }
1488 }
1489 }
1490 }
1491
1492 #[cfg(not(feature = "cel"))]
1493 {
1494 if !config.is_empty() {
1495 return Err(AwaError::Validation(
1496 "CEL expressions require the 'cel' feature".into(),
1497 ));
1498 }
1499 }
1500
1501 let callback_id = Uuid::new_v4();
1502 let timeout_secs = timeout.as_secs_f64();
1503
1504 let result = sqlx::query(
1505 r#"UPDATE awa.jobs
1506 SET callback_id = $2,
1507 callback_timeout_at = now() + make_interval(secs => $3),
1508 callback_filter = $4,
1509 callback_on_complete = $5,
1510 callback_on_fail = $6,
1511 callback_transform = $7
1512 WHERE id = $1 AND state = 'running' AND run_lease = $8"#,
1513 )
1514 .bind(job_id)
1515 .bind(callback_id)
1516 .bind(timeout_secs)
1517 .bind(&config.filter)
1518 .bind(&config.on_complete)
1519 .bind(&config.on_fail)
1520 .bind(&config.transform)
1521 .bind(run_lease)
1522 .execute(executor)
1523 .await?;
1524
1525 if result.rows_affected() == 0 {
1526 return Err(AwaError::Validation("job is not in running state".into()));
1527 }
1528 Ok(callback_id)
1529}
1530
1531enum ResolveAction {
1533 Complete(Option<serde_json::Value>),
1534 Fail {
1535 error: String,
1536 expression: Option<String>,
1537 },
1538 Ignore(String),
1539}
1540
1541pub async fn resolve_callback(
1547 pool: &PgPool,
1548 callback_id: Uuid,
1549 payload: Option<serde_json::Value>,
1550 default_action: DefaultAction,
1551 run_lease: Option<i64>,
1552) -> Result<ResolveOutcome, AwaError> {
1553 let mut tx = pool.begin().await?;
1554
1555 let job = sqlx::query_as::<_, JobRow>(
1564 "SELECT * FROM awa.jobs_hot WHERE callback_id = $1
1565 AND state IN ('waiting_external', 'running')
1566 AND ($2::bigint IS NULL OR run_lease = $2)
1567 FOR UPDATE",
1568 )
1569 .bind(callback_id)
1570 .bind(run_lease)
1571 .fetch_optional(&mut *tx)
1572 .await?
1573 .ok_or(AwaError::CallbackNotFound {
1574 callback_id: callback_id.to_string(),
1575 })?;
1576
1577 let action = evaluate_or_default(&job, &payload, default_action)?;
1578
1579 match action {
1580 ResolveAction::Complete(transformed_payload) => {
1581 let completed_job = sqlx::query_as::<_, JobRow>(
1582 r#"
1583 UPDATE awa.jobs
1584 SET state = 'completed',
1585 finalized_at = now(),
1586 callback_id = NULL,
1587 callback_timeout_at = NULL,
1588 callback_filter = NULL,
1589 callback_on_complete = NULL,
1590 callback_on_fail = NULL,
1591 callback_transform = NULL,
1592 heartbeat_at = NULL,
1593 deadline_at = NULL,
1594 progress = NULL
1595 WHERE id = $1
1596 RETURNING *
1597 "#,
1598 )
1599 .bind(job.id)
1600 .fetch_one(&mut *tx)
1601 .await?;
1602
1603 tx.commit().await?;
1604 Ok(ResolveOutcome::Completed {
1605 payload: transformed_payload,
1606 job: completed_job,
1607 })
1608 }
1609 ResolveAction::Fail { error, expression } => {
1610 let mut error_json = serde_json::json!({
1611 "error": error,
1612 "attempt": job.attempt,
1613 "at": chrono::Utc::now().to_rfc3339(),
1614 });
1615 if let Some(expr) = expression {
1616 error_json["expression"] = serde_json::Value::String(expr);
1617 }
1618
1619 let failed_job = sqlx::query_as::<_, JobRow>(
1620 r#"
1621 UPDATE awa.jobs
1622 SET state = 'failed',
1623 finalized_at = now(),
1624 callback_id = NULL,
1625 callback_timeout_at = NULL,
1626 callback_filter = NULL,
1627 callback_on_complete = NULL,
1628 callback_on_fail = NULL,
1629 callback_transform = NULL,
1630 heartbeat_at = NULL,
1631 deadline_at = NULL,
1632 errors = errors || $2::jsonb
1633 WHERE id = $1
1634 RETURNING *
1635 "#,
1636 )
1637 .bind(job.id)
1638 .bind(error_json)
1639 .fetch_one(&mut *tx)
1640 .await?;
1641
1642 tx.commit().await?;
1643 Ok(ResolveOutcome::Failed { job: failed_job })
1644 }
1645 ResolveAction::Ignore(reason) => {
1646 Ok(ResolveOutcome::Ignored { reason })
1648 }
1649 }
1650}
1651
1652fn evaluate_or_default(
1654 job: &JobRow,
1655 payload: &Option<serde_json::Value>,
1656 default_action: DefaultAction,
1657) -> Result<ResolveAction, AwaError> {
1658 let has_expressions = job.callback_filter.is_some()
1659 || job.callback_on_complete.is_some()
1660 || job.callback_on_fail.is_some()
1661 || job.callback_transform.is_some();
1662
1663 if !has_expressions {
1664 return Ok(apply_default(default_action, payload));
1665 }
1666
1667 #[cfg(feature = "cel")]
1668 {
1669 Ok(evaluate_cel(job, payload, default_action))
1670 }
1671
1672 #[cfg(not(feature = "cel"))]
1673 {
1674 let _ = (payload, default_action);
1677 Err(AwaError::Validation(
1678 "CEL expressions present but 'cel' feature is not enabled".into(),
1679 ))
1680 }
1681}
1682
1683fn apply_default(
1684 default_action: DefaultAction,
1685 payload: &Option<serde_json::Value>,
1686) -> ResolveAction {
1687 match default_action {
1688 DefaultAction::Complete => ResolveAction::Complete(payload.clone()),
1689 DefaultAction::Fail => ResolveAction::Fail {
1690 error: "callback failed: default action".to_string(),
1691 expression: None,
1692 },
1693 DefaultAction::Ignore => {
1694 ResolveAction::Ignore("no expressions configured, default is ignore".to_string())
1695 }
1696 }
1697}
1698
1699#[cfg(feature = "cel")]
1700fn evaluate_cel(
1701 job: &JobRow,
1702 payload: &Option<serde_json::Value>,
1703 default_action: DefaultAction,
1704) -> ResolveAction {
1705 let payload_value = payload.as_ref().cloned().unwrap_or(serde_json::Value::Null);
1706
1707 if let Some(filter_expr) = &job.callback_filter {
1709 match eval_bool(filter_expr, &payload_value, job.id, "filter") {
1710 Ok(true) => {} Ok(false) => {
1712 return ResolveAction::Ignore("filter expression returned false".to_string());
1713 }
1714 Err(_) => {
1715 }
1717 }
1718 }
1719
1720 if let Some(on_fail_expr) = &job.callback_on_fail {
1722 match eval_bool(on_fail_expr, &payload_value, job.id, "on_fail") {
1723 Ok(true) => {
1724 return ResolveAction::Fail {
1725 error: "callback failed: on_fail expression matched".to_string(),
1726 expression: Some(on_fail_expr.clone()),
1727 };
1728 }
1729 Ok(false) => {} Err(_) => {
1731 }
1733 }
1734 }
1735
1736 if let Some(on_complete_expr) = &job.callback_on_complete {
1738 match eval_bool(on_complete_expr, &payload_value, job.id, "on_complete") {
1739 Ok(true) => {
1740 let transformed = apply_transform(job, &payload_value);
1742 return ResolveAction::Complete(Some(transformed));
1743 }
1744 Ok(false) => {} Err(_) => {
1746 }
1748 }
1749 }
1750
1751 apply_default(default_action, payload)
1753}
1754
1755#[cfg(feature = "cel")]
1756fn eval_bool(
1757 expression: &str,
1758 payload_value: &serde_json::Value,
1759 job_id: i64,
1760 expression_name: &str,
1761) -> Result<bool, ()> {
1762 let program = match cel::Program::compile(expression) {
1763 Ok(p) => p,
1764 Err(e) => {
1765 tracing::warn!(
1766 job_id,
1767 expression_name,
1768 expression,
1769 error = %e,
1770 "CEL compilation error during evaluation"
1771 );
1772 return Err(());
1773 }
1774 };
1775
1776 let mut context = cel::Context::default();
1777 if let Err(e) = context.add_variable("payload", payload_value.clone()) {
1778 tracing::warn!(
1779 job_id,
1780 expression_name,
1781 error = %e,
1782 "Failed to add payload variable to CEL context"
1783 );
1784 return Err(());
1785 }
1786
1787 match program.execute(&context) {
1788 Ok(cel::Value::Bool(b)) => Ok(b),
1789 Ok(other) => {
1790 tracing::warn!(
1791 job_id,
1792 expression_name,
1793 expression,
1794 result_type = ?other.type_of(),
1795 "CEL expression returned non-bool"
1796 );
1797 Err(())
1798 }
1799 Err(e) => {
1800 tracing::warn!(
1801 job_id,
1802 expression_name,
1803 expression,
1804 error = %e,
1805 "CEL execution error"
1806 );
1807 Err(())
1808 }
1809 }
1810}
1811
1812#[cfg(feature = "cel")]
1813fn apply_transform(job: &JobRow, payload_value: &serde_json::Value) -> serde_json::Value {
1814 let transform_expr = match &job.callback_transform {
1815 Some(expr) => expr,
1816 None => return payload_value.clone(),
1817 };
1818
1819 let program = match cel::Program::compile(transform_expr) {
1820 Ok(p) => p,
1821 Err(e) => {
1822 tracing::warn!(
1823 job_id = job.id,
1824 expression = transform_expr,
1825 error = %e,
1826 "CEL transform compilation error, using original payload"
1827 );
1828 return payload_value.clone();
1829 }
1830 };
1831
1832 let mut context = cel::Context::default();
1833 if let Err(e) = context.add_variable("payload", payload_value.clone()) {
1834 tracing::warn!(
1835 job_id = job.id,
1836 error = %e,
1837 "Failed to add payload variable for transform"
1838 );
1839 return payload_value.clone();
1840 }
1841
1842 match program.execute(&context) {
1843 Ok(value) => match value.json() {
1844 Ok(json) => json,
1845 Err(e) => {
1846 tracing::warn!(
1847 job_id = job.id,
1848 expression = transform_expr,
1849 error = %e,
1850 "CEL transform result could not be converted to JSON, using original payload"
1851 );
1852 payload_value.clone()
1853 }
1854 },
1855 Err(e) => {
1856 tracing::warn!(
1857 job_id = job.id,
1858 expression = transform_expr,
1859 error = %e,
1860 "CEL transform execution error, using original payload"
1861 );
1862 payload_value.clone()
1863 }
1864 }
1865}