1use crate::error::AwaError;
2use crate::job::{JobRow, JobState};
3use serde::Serialize;
4use sqlx::PgExecutor;
5use sqlx::PgPool;
6use std::collections::HashMap;
7use uuid::Uuid;
8
9pub async fn retry<'e, E>(executor: E, job_id: i64) -> Result<Option<JobRow>, AwaError>
11where
12 E: PgExecutor<'e>,
13{
14 sqlx::query_as::<_, JobRow>(
15 r#"
16 UPDATE awa.jobs
17 SET state = 'available', attempt = 0, run_at = now(),
18 finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL,
19 callback_id = NULL, callback_timeout_at = NULL,
20 callback_filter = NULL, callback_on_complete = NULL,
21 callback_on_fail = NULL, callback_transform = NULL
22 WHERE id = $1 AND state IN ('failed', 'cancelled', 'waiting_external')
23 RETURNING *
24 "#,
25 )
26 .bind(job_id)
27 .fetch_optional(executor)
28 .await?
29 .ok_or(AwaError::JobNotFound { id: job_id })
30 .map(Some)
31}
32
33pub async fn cancel<'e, E>(executor: E, job_id: i64) -> Result<Option<JobRow>, AwaError>
35where
36 E: PgExecutor<'e>,
37{
38 sqlx::query_as::<_, JobRow>(
39 r#"
40 UPDATE awa.jobs
41 SET state = 'cancelled', finalized_at = now(),
42 callback_id = NULL, callback_timeout_at = NULL,
43 callback_filter = NULL, callback_on_complete = NULL,
44 callback_on_fail = NULL, callback_transform = NULL
45 WHERE id = $1 AND state NOT IN ('completed', 'failed', 'cancelled')
46 RETURNING *
47 "#,
48 )
49 .bind(job_id)
50 .fetch_optional(executor)
51 .await?
52 .ok_or(AwaError::JobNotFound { id: job_id })
53 .map(Some)
54}
55
56pub async fn retry_failed_by_kind<'e, E>(executor: E, kind: &str) -> Result<Vec<JobRow>, AwaError>
58where
59 E: PgExecutor<'e>,
60{
61 let rows = sqlx::query_as::<_, JobRow>(
62 r#"
63 UPDATE awa.jobs
64 SET state = 'available', attempt = 0, run_at = now(),
65 finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL
66 WHERE kind = $1 AND state = 'failed'
67 RETURNING *
68 "#,
69 )
70 .bind(kind)
71 .fetch_all(executor)
72 .await?;
73
74 Ok(rows)
75}
76
77pub async fn retry_failed_by_queue<'e, E>(executor: E, queue: &str) -> Result<Vec<JobRow>, AwaError>
79where
80 E: PgExecutor<'e>,
81{
82 let rows = sqlx::query_as::<_, JobRow>(
83 r#"
84 UPDATE awa.jobs
85 SET state = 'available', attempt = 0, run_at = now(),
86 finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL
87 WHERE queue = $1 AND state = 'failed'
88 RETURNING *
89 "#,
90 )
91 .bind(queue)
92 .fetch_all(executor)
93 .await?;
94
95 Ok(rows)
96}
97
98pub async fn discard_failed<'e, E>(executor: E, kind: &str) -> Result<u64, AwaError>
100where
101 E: PgExecutor<'e>,
102{
103 let result = sqlx::query("DELETE FROM awa.jobs WHERE kind = $1 AND state = 'failed'")
104 .bind(kind)
105 .execute(executor)
106 .await?;
107
108 Ok(result.rows_affected())
109}
110
111pub async fn pause_queue<'e, E>(
113 executor: E,
114 queue: &str,
115 paused_by: Option<&str>,
116) -> Result<(), AwaError>
117where
118 E: PgExecutor<'e>,
119{
120 sqlx::query(
121 r#"
122 INSERT INTO awa.queue_meta (queue, paused, paused_at, paused_by)
123 VALUES ($1, TRUE, now(), $2)
124 ON CONFLICT (queue) DO UPDATE SET paused = TRUE, paused_at = now(), paused_by = $2
125 "#,
126 )
127 .bind(queue)
128 .bind(paused_by)
129 .execute(executor)
130 .await?;
131
132 Ok(())
133}
134
135pub async fn resume_queue<'e, E>(executor: E, queue: &str) -> Result<(), AwaError>
137where
138 E: PgExecutor<'e>,
139{
140 sqlx::query("UPDATE awa.queue_meta SET paused = FALSE WHERE queue = $1")
141 .bind(queue)
142 .execute(executor)
143 .await?;
144
145 Ok(())
146}
147
148pub async fn drain_queue<'e, E>(executor: E, queue: &str) -> Result<u64, AwaError>
150where
151 E: PgExecutor<'e>,
152{
153 let result = sqlx::query(
154 r#"
155 UPDATE awa.jobs
156 SET state = 'cancelled', finalized_at = now(),
157 callback_id = NULL, callback_timeout_at = NULL,
158 callback_filter = NULL, callback_on_complete = NULL,
159 callback_on_fail = NULL, callback_transform = NULL
160 WHERE queue = $1 AND state IN ('available', 'scheduled', 'retryable', 'waiting_external')
161 "#,
162 )
163 .bind(queue)
164 .execute(executor)
165 .await?;
166
167 Ok(result.rows_affected())
168}
169
170#[derive(Debug, Clone, Serialize)]
172pub struct QueueStats {
173 pub queue: String,
174 pub available: i64,
175 pub running: i64,
176 pub failed: i64,
177 pub waiting_external: i64,
178 pub completed_last_hour: i64,
179 pub lag_seconds: Option<f64>,
180 pub paused: bool,
181}
182
183pub async fn queue_stats<'e, E>(executor: E) -> Result<Vec<QueueStats>, AwaError>
185where
186 E: PgExecutor<'e>,
187{
188 let rows = sqlx::query_as::<_, (String, i64, i64, i64, i64, i64, Option<f64>, bool)>(
189 r#"
190 SELECT
191 j.queue,
192 count(*) FILTER (WHERE j.state = 'available') AS available,
193 count(*) FILTER (WHERE j.state = 'running') AS running,
194 count(*) FILTER (WHERE j.state = 'failed') AS failed,
195 count(*) FILTER (WHERE j.state = 'waiting_external') AS waiting_external,
196 count(*) FILTER (WHERE j.state = 'completed'
197 AND j.finalized_at > now() - interval '1 hour') AS completed_last_hour,
198 EXTRACT(EPOCH FROM (now() - min(j.run_at) FILTER (WHERE j.state = 'available')))::float8 AS lag_seconds,
199 COALESCE(qm.paused, FALSE) AS paused
200 FROM awa.jobs j
201 LEFT JOIN awa.queue_meta qm ON qm.queue = j.queue
202 GROUP BY j.queue, qm.paused
203 "#,
204 )
205 .fetch_all(executor)
206 .await?;
207
208 Ok(rows
209 .into_iter()
210 .map(
211 |(
212 queue,
213 available,
214 running,
215 failed,
216 waiting_external,
217 completed_last_hour,
218 lag_seconds,
219 paused,
220 )| QueueStats {
221 queue,
222 available,
223 running,
224 failed,
225 waiting_external,
226 completed_last_hour,
227 lag_seconds,
228 paused,
229 },
230 )
231 .collect())
232}
233
234#[derive(Debug, Clone, Default, Serialize)]
236pub struct ListJobsFilter {
237 pub state: Option<JobState>,
238 pub kind: Option<String>,
239 pub queue: Option<String>,
240 pub tag: Option<String>,
241 pub before_id: Option<i64>,
242 pub limit: Option<i64>,
243}
244
245pub async fn list_jobs<'e, E>(executor: E, filter: &ListJobsFilter) -> Result<Vec<JobRow>, AwaError>
247where
248 E: PgExecutor<'e>,
249{
250 let limit = filter.limit.unwrap_or(100);
251
252 let rows = sqlx::query_as::<_, JobRow>(
253 r#"
254 SELECT * FROM awa.jobs
255 WHERE ($1::awa.job_state IS NULL OR state = $1)
256 AND ($2::text IS NULL OR kind = $2)
257 AND ($3::text IS NULL OR queue = $3)
258 AND ($4::text IS NULL OR tags @> ARRAY[$4]::text[])
259 AND ($5::bigint IS NULL OR id < $5)
260 ORDER BY id DESC
261 LIMIT $6
262 "#,
263 )
264 .bind(filter.state)
265 .bind(&filter.kind)
266 .bind(&filter.queue)
267 .bind(&filter.tag)
268 .bind(filter.before_id)
269 .bind(limit)
270 .fetch_all(executor)
271 .await?;
272
273 Ok(rows)
274}
275
276pub async fn get_job<'e, E>(executor: E, job_id: i64) -> Result<JobRow, AwaError>
278where
279 E: PgExecutor<'e>,
280{
281 let row = sqlx::query_as::<_, JobRow>("SELECT * FROM awa.jobs WHERE id = $1")
282 .bind(job_id)
283 .fetch_optional(executor)
284 .await?;
285
286 row.ok_or(AwaError::JobNotFound { id: job_id })
287}
288
289pub async fn state_counts<'e, E>(executor: E) -> Result<HashMap<JobState, i64>, AwaError>
291where
292 E: PgExecutor<'e>,
293{
294 let rows =
295 sqlx::query_as::<_, (JobState, i64)>("SELECT state, count(*) FROM awa.jobs GROUP BY state")
296 .fetch_all(executor)
297 .await?;
298
299 Ok(rows.into_iter().collect())
300}
301
302pub async fn distinct_kinds<'e, E>(executor: E) -> Result<Vec<String>, AwaError>
304where
305 E: PgExecutor<'e>,
306{
307 let rows = sqlx::query_scalar::<_, String>("SELECT DISTINCT kind FROM awa.jobs ORDER BY kind")
308 .fetch_all(executor)
309 .await?;
310
311 Ok(rows)
312}
313
314pub async fn distinct_queues<'e, E>(executor: E) -> Result<Vec<String>, AwaError>
316where
317 E: PgExecutor<'e>,
318{
319 let rows =
320 sqlx::query_scalar::<_, String>("SELECT DISTINCT queue FROM awa.jobs ORDER BY queue")
321 .fetch_all(executor)
322 .await?;
323
324 Ok(rows)
325}
326
327pub async fn bulk_retry<'e, E>(executor: E, ids: &[i64]) -> Result<Vec<JobRow>, AwaError>
329where
330 E: PgExecutor<'e>,
331{
332 let rows = sqlx::query_as::<_, JobRow>(
333 r#"
334 UPDATE awa.jobs
335 SET state = 'available', attempt = 0, run_at = now(),
336 finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL,
337 callback_id = NULL, callback_timeout_at = NULL,
338 callback_filter = NULL, callback_on_complete = NULL,
339 callback_on_fail = NULL, callback_transform = NULL
340 WHERE id = ANY($1) AND state IN ('failed', 'cancelled', 'waiting_external')
341 RETURNING *
342 "#,
343 )
344 .bind(ids)
345 .fetch_all(executor)
346 .await?;
347
348 Ok(rows)
349}
350
351pub async fn bulk_cancel<'e, E>(executor: E, ids: &[i64]) -> Result<Vec<JobRow>, AwaError>
353where
354 E: PgExecutor<'e>,
355{
356 let rows = sqlx::query_as::<_, JobRow>(
357 r#"
358 UPDATE awa.jobs
359 SET state = 'cancelled', finalized_at = now(),
360 callback_id = NULL, callback_timeout_at = NULL,
361 callback_filter = NULL, callback_on_complete = NULL,
362 callback_on_fail = NULL, callback_transform = NULL
363 WHERE id = ANY($1) AND state NOT IN ('completed', 'failed', 'cancelled')
364 RETURNING *
365 "#,
366 )
367 .bind(ids)
368 .fetch_all(executor)
369 .await?;
370
371 Ok(rows)
372}
373
374#[derive(Debug, Clone, Serialize)]
376pub struct StateTimeseriesBucket {
377 pub bucket: chrono::DateTime<chrono::Utc>,
378 pub state: JobState,
379 pub count: i64,
380}
381
382pub async fn state_timeseries<'e, E>(
384 executor: E,
385 minutes: i32,
386) -> Result<Vec<StateTimeseriesBucket>, AwaError>
387where
388 E: PgExecutor<'e>,
389{
390 let rows = sqlx::query_as::<_, (chrono::DateTime<chrono::Utc>, JobState, i64)>(
391 r#"
392 SELECT
393 date_trunc('minute', created_at) AS bucket,
394 state,
395 count(*) AS count
396 FROM awa.jobs
397 WHERE created_at >= now() - make_interval(mins => $1)
398 GROUP BY bucket, state
399 ORDER BY bucket
400 "#,
401 )
402 .bind(minutes)
403 .fetch_all(executor)
404 .await?;
405
406 Ok(rows
407 .into_iter()
408 .map(|(bucket, state, count)| StateTimeseriesBucket {
409 bucket,
410 state,
411 count,
412 })
413 .collect())
414}
415
416pub async fn register_callback<'e, E>(
425 executor: E,
426 job_id: i64,
427 run_lease: i64,
428 timeout: std::time::Duration,
429) -> Result<Uuid, AwaError>
430where
431 E: PgExecutor<'e>,
432{
433 let callback_id = Uuid::new_v4();
434 let timeout_secs = timeout.as_secs_f64();
435 let result = sqlx::query(
436 r#"UPDATE awa.jobs
437 SET callback_id = $2,
438 callback_timeout_at = now() + make_interval(secs => $3),
439 callback_filter = NULL,
440 callback_on_complete = NULL,
441 callback_on_fail = NULL,
442 callback_transform = NULL
443 WHERE id = $1 AND state = 'running' AND run_lease = $4"#,
444 )
445 .bind(job_id)
446 .bind(callback_id)
447 .bind(timeout_secs)
448 .bind(run_lease)
449 .execute(executor)
450 .await?;
451 if result.rows_affected() == 0 {
452 return Err(AwaError::Validation("job is not in running state".into()));
453 }
454 Ok(callback_id)
455}
456
457pub async fn complete_external<'e, E>(
467 executor: E,
468 callback_id: Uuid,
469 _payload: Option<serde_json::Value>,
470) -> Result<JobRow, AwaError>
471where
472 E: PgExecutor<'e>,
473{
474 let row = sqlx::query_as::<_, JobRow>(
475 r#"
476 UPDATE awa.jobs
477 SET state = 'completed',
478 finalized_at = now(),
479 callback_id = NULL,
480 callback_timeout_at = NULL,
481 callback_filter = NULL,
482 callback_on_complete = NULL,
483 callback_on_fail = NULL,
484 callback_transform = NULL,
485 heartbeat_at = NULL,
486 deadline_at = NULL,
487 progress = NULL
488 WHERE callback_id = $1 AND state IN ('waiting_external', 'running')
489 RETURNING *
490 "#,
491 )
492 .bind(callback_id)
493 .fetch_optional(executor)
494 .await?;
495
496 row.ok_or(AwaError::CallbackNotFound {
497 callback_id: callback_id.to_string(),
498 })
499}
500
501pub async fn fail_external<'e, E>(
505 executor: E,
506 callback_id: Uuid,
507 error: &str,
508) -> Result<JobRow, AwaError>
509where
510 E: PgExecutor<'e>,
511{
512 let row = sqlx::query_as::<_, JobRow>(
513 r#"
514 UPDATE awa.jobs
515 SET state = 'failed',
516 finalized_at = now(),
517 callback_id = NULL,
518 callback_timeout_at = NULL,
519 callback_filter = NULL,
520 callback_on_complete = NULL,
521 callback_on_fail = NULL,
522 callback_transform = NULL,
523 heartbeat_at = NULL,
524 deadline_at = NULL,
525 errors = errors || jsonb_build_object(
526 'error', $2::text,
527 'attempt', attempt,
528 'at', now()
529 )::jsonb
530 WHERE callback_id = $1 AND state IN ('waiting_external', 'running')
531 RETURNING *
532 "#,
533 )
534 .bind(callback_id)
535 .bind(error)
536 .fetch_optional(executor)
537 .await?;
538
539 row.ok_or(AwaError::CallbackNotFound {
540 callback_id: callback_id.to_string(),
541 })
542}
543
544pub async fn retry_external<'e, E>(executor: E, callback_id: Uuid) -> Result<JobRow, AwaError>
554where
555 E: PgExecutor<'e>,
556{
557 let row = sqlx::query_as::<_, JobRow>(
558 r#"
559 UPDATE awa.jobs
560 SET state = 'available',
561 attempt = 0,
562 run_at = now(),
563 finalized_at = NULL,
564 callback_id = NULL,
565 callback_timeout_at = NULL,
566 callback_filter = NULL,
567 callback_on_complete = NULL,
568 callback_on_fail = NULL,
569 callback_transform = NULL,
570 heartbeat_at = NULL,
571 deadline_at = NULL
572 WHERE callback_id = $1 AND state = 'waiting_external'
573 RETURNING *
574 "#,
575 )
576 .bind(callback_id)
577 .fetch_optional(executor)
578 .await?;
579
580 row.ok_or(AwaError::CallbackNotFound {
581 callback_id: callback_id.to_string(),
582 })
583}
584
585#[derive(Debug, Clone, Default)]
592pub struct CallbackConfig {
593 pub filter: Option<String>,
595 pub on_complete: Option<String>,
597 pub on_fail: Option<String>,
599 pub transform: Option<String>,
601}
602
603impl CallbackConfig {
604 pub fn is_empty(&self) -> bool {
606 self.filter.is_none()
607 && self.on_complete.is_none()
608 && self.on_fail.is_none()
609 && self.transform.is_none()
610 }
611}
612
613#[derive(Debug, Clone, Copy, PartialEq, Eq)]
616pub enum DefaultAction {
617 Complete,
618 Fail,
619 Ignore,
620}
621
622#[derive(Debug)]
624pub enum ResolveOutcome {
625 Completed {
626 payload: Option<serde_json::Value>,
627 job: JobRow,
628 },
629 Failed {
630 job: JobRow,
631 },
632 Ignored {
633 reason: String,
634 },
635}
636
637impl ResolveOutcome {
638 pub fn is_completed(&self) -> bool {
639 matches!(self, ResolveOutcome::Completed { .. })
640 }
641 pub fn is_failed(&self) -> bool {
642 matches!(self, ResolveOutcome::Failed { .. })
643 }
644 pub fn is_ignored(&self) -> bool {
645 matches!(self, ResolveOutcome::Ignored { .. })
646 }
647}
648
649pub async fn register_callback_with_config<'e, E>(
658 executor: E,
659 job_id: i64,
660 run_lease: i64,
661 timeout: std::time::Duration,
662 config: &CallbackConfig,
663) -> Result<Uuid, AwaError>
664where
665 E: PgExecutor<'e>,
666{
667 #[cfg(feature = "cel")]
669 {
670 for (name, expr) in [
671 ("filter", &config.filter),
672 ("on_complete", &config.on_complete),
673 ("on_fail", &config.on_fail),
674 ("transform", &config.transform),
675 ] {
676 if let Some(src) = expr {
677 let program = cel::Program::compile(src).map_err(|e| {
678 AwaError::Validation(format!("invalid CEL expression for {name}: {e}"))
679 })?;
680
681 let refs = program.references();
685 let bad_vars: Vec<&str> = refs
686 .variables()
687 .into_iter()
688 .filter(|v| *v != "payload")
689 .collect();
690 if !bad_vars.is_empty() {
691 return Err(AwaError::Validation(format!(
692 "CEL expression for {name} references undeclared variable(s): {}; \
693 only 'payload' is available",
694 bad_vars.join(", ")
695 )));
696 }
697 }
698 }
699 }
700
701 #[cfg(not(feature = "cel"))]
702 {
703 if !config.is_empty() {
704 return Err(AwaError::Validation(
705 "CEL expressions require the 'cel' feature".into(),
706 ));
707 }
708 }
709
710 let callback_id = Uuid::new_v4();
711 let timeout_secs = timeout.as_secs_f64();
712
713 let result = sqlx::query(
714 r#"UPDATE awa.jobs
715 SET callback_id = $2,
716 callback_timeout_at = now() + make_interval(secs => $3),
717 callback_filter = $4,
718 callback_on_complete = $5,
719 callback_on_fail = $6,
720 callback_transform = $7
721 WHERE id = $1 AND state = 'running' AND run_lease = $8"#,
722 )
723 .bind(job_id)
724 .bind(callback_id)
725 .bind(timeout_secs)
726 .bind(&config.filter)
727 .bind(&config.on_complete)
728 .bind(&config.on_fail)
729 .bind(&config.transform)
730 .bind(run_lease)
731 .execute(executor)
732 .await?;
733
734 if result.rows_affected() == 0 {
735 return Err(AwaError::Validation("job is not in running state".into()));
736 }
737 Ok(callback_id)
738}
739
740enum ResolveAction {
742 Complete(Option<serde_json::Value>),
743 Fail {
744 error: String,
745 expression: Option<String>,
746 },
747 Ignore(String),
748}
749
750pub async fn resolve_callback(
756 pool: &PgPool,
757 callback_id: Uuid,
758 payload: Option<serde_json::Value>,
759 default_action: DefaultAction,
760) -> Result<ResolveOutcome, AwaError> {
761 let mut tx = pool.begin().await?;
762
763 let job = sqlx::query_as::<_, JobRow>(
768 "SELECT * FROM awa.jobs_hot WHERE callback_id = $1
769 AND state = 'waiting_external'
770 FOR UPDATE",
771 )
772 .bind(callback_id)
773 .fetch_optional(&mut *tx)
774 .await?
775 .ok_or(AwaError::CallbackNotFound {
776 callback_id: callback_id.to_string(),
777 })?;
778
779 let action = evaluate_or_default(&job, &payload, default_action)?;
780
781 match action {
782 ResolveAction::Complete(transformed_payload) => {
783 let completed_job = sqlx::query_as::<_, JobRow>(
784 r#"
785 UPDATE awa.jobs
786 SET state = 'completed',
787 finalized_at = now(),
788 callback_id = NULL,
789 callback_timeout_at = NULL,
790 callback_filter = NULL,
791 callback_on_complete = NULL,
792 callback_on_fail = NULL,
793 callback_transform = NULL,
794 heartbeat_at = NULL,
795 deadline_at = NULL,
796 progress = NULL
797 WHERE id = $1
798 RETURNING *
799 "#,
800 )
801 .bind(job.id)
802 .fetch_one(&mut *tx)
803 .await?;
804
805 tx.commit().await?;
806 Ok(ResolveOutcome::Completed {
807 payload: transformed_payload,
808 job: completed_job,
809 })
810 }
811 ResolveAction::Fail { error, expression } => {
812 let mut error_json = serde_json::json!({
813 "error": error,
814 "attempt": job.attempt,
815 "at": chrono::Utc::now().to_rfc3339(),
816 });
817 if let Some(expr) = expression {
818 error_json["expression"] = serde_json::Value::String(expr);
819 }
820
821 let failed_job = sqlx::query_as::<_, JobRow>(
822 r#"
823 UPDATE awa.jobs
824 SET state = 'failed',
825 finalized_at = now(),
826 callback_id = NULL,
827 callback_timeout_at = NULL,
828 callback_filter = NULL,
829 callback_on_complete = NULL,
830 callback_on_fail = NULL,
831 callback_transform = NULL,
832 heartbeat_at = NULL,
833 deadline_at = NULL,
834 errors = errors || $2::jsonb
835 WHERE id = $1
836 RETURNING *
837 "#,
838 )
839 .bind(job.id)
840 .bind(error_json)
841 .fetch_one(&mut *tx)
842 .await?;
843
844 tx.commit().await?;
845 Ok(ResolveOutcome::Failed { job: failed_job })
846 }
847 ResolveAction::Ignore(reason) => {
848 Ok(ResolveOutcome::Ignored { reason })
850 }
851 }
852}
853
854fn evaluate_or_default(
856 job: &JobRow,
857 payload: &Option<serde_json::Value>,
858 default_action: DefaultAction,
859) -> Result<ResolveAction, AwaError> {
860 let has_expressions = job.callback_filter.is_some()
861 || job.callback_on_complete.is_some()
862 || job.callback_on_fail.is_some()
863 || job.callback_transform.is_some();
864
865 if !has_expressions {
866 return Ok(apply_default(default_action, payload));
867 }
868
869 #[cfg(feature = "cel")]
870 {
871 Ok(evaluate_cel(job, payload, default_action))
872 }
873
874 #[cfg(not(feature = "cel"))]
875 {
876 let _ = (payload, default_action);
879 Err(AwaError::Validation(
880 "CEL expressions present but 'cel' feature is not enabled".into(),
881 ))
882 }
883}
884
885fn apply_default(
886 default_action: DefaultAction,
887 payload: &Option<serde_json::Value>,
888) -> ResolveAction {
889 match default_action {
890 DefaultAction::Complete => ResolveAction::Complete(payload.clone()),
891 DefaultAction::Fail => ResolveAction::Fail {
892 error: "callback failed: default action".to_string(),
893 expression: None,
894 },
895 DefaultAction::Ignore => {
896 ResolveAction::Ignore("no expressions configured, default is ignore".to_string())
897 }
898 }
899}
900
901#[cfg(feature = "cel")]
902fn evaluate_cel(
903 job: &JobRow,
904 payload: &Option<serde_json::Value>,
905 default_action: DefaultAction,
906) -> ResolveAction {
907 let payload_value = payload.as_ref().cloned().unwrap_or(serde_json::Value::Null);
908
909 if let Some(filter_expr) = &job.callback_filter {
911 match eval_bool(filter_expr, &payload_value, job.id, "filter") {
912 Ok(true) => {} Ok(false) => {
914 return ResolveAction::Ignore("filter expression returned false".to_string());
915 }
916 Err(_) => {
917 }
919 }
920 }
921
922 if let Some(on_fail_expr) = &job.callback_on_fail {
924 match eval_bool(on_fail_expr, &payload_value, job.id, "on_fail") {
925 Ok(true) => {
926 return ResolveAction::Fail {
927 error: "callback failed: on_fail expression matched".to_string(),
928 expression: Some(on_fail_expr.clone()),
929 };
930 }
931 Ok(false) => {} Err(_) => {
933 }
935 }
936 }
937
938 if let Some(on_complete_expr) = &job.callback_on_complete {
940 match eval_bool(on_complete_expr, &payload_value, job.id, "on_complete") {
941 Ok(true) => {
942 let transformed = apply_transform(job, &payload_value);
944 return ResolveAction::Complete(Some(transformed));
945 }
946 Ok(false) => {} Err(_) => {
948 }
950 }
951 }
952
953 apply_default(default_action, payload)
955}
956
957#[cfg(feature = "cel")]
958fn eval_bool(
959 expression: &str,
960 payload_value: &serde_json::Value,
961 job_id: i64,
962 expression_name: &str,
963) -> Result<bool, ()> {
964 let program = match cel::Program::compile(expression) {
965 Ok(p) => p,
966 Err(e) => {
967 tracing::warn!(
968 job_id,
969 expression_name,
970 expression,
971 error = %e,
972 "CEL compilation error during evaluation"
973 );
974 return Err(());
975 }
976 };
977
978 let mut context = cel::Context::default();
979 if let Err(e) = context.add_variable("payload", payload_value.clone()) {
980 tracing::warn!(
981 job_id,
982 expression_name,
983 error = %e,
984 "Failed to add payload variable to CEL context"
985 );
986 return Err(());
987 }
988
989 match program.execute(&context) {
990 Ok(cel::Value::Bool(b)) => Ok(b),
991 Ok(other) => {
992 tracing::warn!(
993 job_id,
994 expression_name,
995 expression,
996 result_type = ?other.type_of(),
997 "CEL expression returned non-bool"
998 );
999 Err(())
1000 }
1001 Err(e) => {
1002 tracing::warn!(
1003 job_id,
1004 expression_name,
1005 expression,
1006 error = %e,
1007 "CEL execution error"
1008 );
1009 Err(())
1010 }
1011 }
1012}
1013
1014#[cfg(feature = "cel")]
1015fn apply_transform(job: &JobRow, payload_value: &serde_json::Value) -> serde_json::Value {
1016 let transform_expr = match &job.callback_transform {
1017 Some(expr) => expr,
1018 None => return payload_value.clone(),
1019 };
1020
1021 let program = match cel::Program::compile(transform_expr) {
1022 Ok(p) => p,
1023 Err(e) => {
1024 tracing::warn!(
1025 job_id = job.id,
1026 expression = transform_expr,
1027 error = %e,
1028 "CEL transform compilation error, using original payload"
1029 );
1030 return payload_value.clone();
1031 }
1032 };
1033
1034 let mut context = cel::Context::default();
1035 if let Err(e) = context.add_variable("payload", payload_value.clone()) {
1036 tracing::warn!(
1037 job_id = job.id,
1038 error = %e,
1039 "Failed to add payload variable for transform"
1040 );
1041 return payload_value.clone();
1042 }
1043
1044 match program.execute(&context) {
1045 Ok(value) => match value.json() {
1046 Ok(json) => json,
1047 Err(e) => {
1048 tracing::warn!(
1049 job_id = job.id,
1050 expression = transform_expr,
1051 error = %e,
1052 "CEL transform result could not be converted to JSON, using original payload"
1053 );
1054 payload_value.clone()
1055 }
1056 },
1057 Err(e) => {
1058 tracing::warn!(
1059 job_id = job.id,
1060 expression = transform_expr,
1061 error = %e,
1062 "CEL transform execution error, using original payload"
1063 );
1064 payload_value.clone()
1065 }
1066 }
1067}