Skip to main content

awa_model/
admin.rs

1use crate::error::AwaError;
2use crate::job::{JobRow, JobState};
3use serde::Serialize;
4use sqlx::PgExecutor;
5use sqlx::PgPool;
6use std::collections::HashMap;
7use uuid::Uuid;
8
9/// Retry a single failed, cancelled, or waiting_external job.
10pub async fn retry<'e, E>(executor: E, job_id: i64) -> Result<Option<JobRow>, AwaError>
11where
12    E: PgExecutor<'e>,
13{
14    sqlx::query_as::<_, JobRow>(
15        r#"
16        UPDATE awa.jobs
17        SET state = 'available', attempt = 0, run_at = now(),
18            finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL,
19            callback_id = NULL, callback_timeout_at = NULL,
20            callback_filter = NULL, callback_on_complete = NULL,
21            callback_on_fail = NULL, callback_transform = NULL
22        WHERE id = $1 AND state IN ('failed', 'cancelled', 'waiting_external')
23        RETURNING *
24        "#,
25    )
26    .bind(job_id)
27    .fetch_optional(executor)
28    .await?
29    .ok_or(AwaError::JobNotFound { id: job_id })
30    .map(Some)
31}
32
33/// Cancel a single non-terminal job.
34pub async fn cancel<'e, E>(executor: E, job_id: i64) -> Result<Option<JobRow>, AwaError>
35where
36    E: PgExecutor<'e>,
37{
38    sqlx::query_as::<_, JobRow>(
39        r#"
40        UPDATE awa.jobs
41        SET state = 'cancelled', finalized_at = now(),
42            callback_id = NULL, callback_timeout_at = NULL,
43            callback_filter = NULL, callback_on_complete = NULL,
44            callback_on_fail = NULL, callback_transform = NULL
45        WHERE id = $1 AND state NOT IN ('completed', 'failed', 'cancelled')
46        RETURNING *
47        "#,
48    )
49    .bind(job_id)
50    .fetch_optional(executor)
51    .await?
52    .ok_or(AwaError::JobNotFound { id: job_id })
53    .map(Some)
54}
55
56/// Retry all failed jobs of a given kind.
57pub async fn retry_failed_by_kind<'e, E>(executor: E, kind: &str) -> Result<Vec<JobRow>, AwaError>
58where
59    E: PgExecutor<'e>,
60{
61    let rows = sqlx::query_as::<_, JobRow>(
62        r#"
63        UPDATE awa.jobs
64        SET state = 'available', attempt = 0, run_at = now(),
65            finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL
66        WHERE kind = $1 AND state = 'failed'
67        RETURNING *
68        "#,
69    )
70    .bind(kind)
71    .fetch_all(executor)
72    .await?;
73
74    Ok(rows)
75}
76
77/// Retry all failed jobs in a given queue.
78pub async fn retry_failed_by_queue<'e, E>(executor: E, queue: &str) -> Result<Vec<JobRow>, AwaError>
79where
80    E: PgExecutor<'e>,
81{
82    let rows = sqlx::query_as::<_, JobRow>(
83        r#"
84        UPDATE awa.jobs
85        SET state = 'available', attempt = 0, run_at = now(),
86            finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL
87        WHERE queue = $1 AND state = 'failed'
88        RETURNING *
89        "#,
90    )
91    .bind(queue)
92    .fetch_all(executor)
93    .await?;
94
95    Ok(rows)
96}
97
98/// Discard (delete) all failed jobs of a given kind.
99pub async fn discard_failed<'e, E>(executor: E, kind: &str) -> Result<u64, AwaError>
100where
101    E: PgExecutor<'e>,
102{
103    let result = sqlx::query("DELETE FROM awa.jobs WHERE kind = $1 AND state = 'failed'")
104        .bind(kind)
105        .execute(executor)
106        .await?;
107
108    Ok(result.rows_affected())
109}
110
111/// Pause a queue. Affects all workers immediately.
112pub async fn pause_queue<'e, E>(
113    executor: E,
114    queue: &str,
115    paused_by: Option<&str>,
116) -> Result<(), AwaError>
117where
118    E: PgExecutor<'e>,
119{
120    sqlx::query(
121        r#"
122        INSERT INTO awa.queue_meta (queue, paused, paused_at, paused_by)
123        VALUES ($1, TRUE, now(), $2)
124        ON CONFLICT (queue) DO UPDATE SET paused = TRUE, paused_at = now(), paused_by = $2
125        "#,
126    )
127    .bind(queue)
128    .bind(paused_by)
129    .execute(executor)
130    .await?;
131
132    Ok(())
133}
134
135/// Resume a paused queue.
136pub async fn resume_queue<'e, E>(executor: E, queue: &str) -> Result<(), AwaError>
137where
138    E: PgExecutor<'e>,
139{
140    sqlx::query("UPDATE awa.queue_meta SET paused = FALSE WHERE queue = $1")
141        .bind(queue)
142        .execute(executor)
143        .await?;
144
145    Ok(())
146}
147
148/// Drain a queue: cancel all non-running, non-terminal jobs.
149pub async fn drain_queue<'e, E>(executor: E, queue: &str) -> Result<u64, AwaError>
150where
151    E: PgExecutor<'e>,
152{
153    let result = sqlx::query(
154        r#"
155        UPDATE awa.jobs
156        SET state = 'cancelled', finalized_at = now(),
157            callback_id = NULL, callback_timeout_at = NULL,
158            callback_filter = NULL, callback_on_complete = NULL,
159            callback_on_fail = NULL, callback_transform = NULL
160        WHERE queue = $1 AND state IN ('available', 'scheduled', 'retryable', 'waiting_external')
161        "#,
162    )
163    .bind(queue)
164    .execute(executor)
165    .await?;
166
167    Ok(result.rows_affected())
168}
169
170/// Queue statistics.
171#[derive(Debug, Clone, Serialize)]
172pub struct QueueStats {
173    pub queue: String,
174    pub available: i64,
175    pub running: i64,
176    pub failed: i64,
177    pub waiting_external: i64,
178    pub completed_last_hour: i64,
179    pub lag_seconds: Option<f64>,
180    pub paused: bool,
181}
182
183/// Get statistics for all queues.
184pub async fn queue_stats<'e, E>(executor: E) -> Result<Vec<QueueStats>, AwaError>
185where
186    E: PgExecutor<'e>,
187{
188    let rows = sqlx::query_as::<_, (String, i64, i64, i64, i64, i64, Option<f64>, bool)>(
189        r#"
190        SELECT
191            j.queue,
192            count(*) FILTER (WHERE j.state = 'available') AS available,
193            count(*) FILTER (WHERE j.state = 'running') AS running,
194            count(*) FILTER (WHERE j.state = 'failed') AS failed,
195            count(*) FILTER (WHERE j.state = 'waiting_external') AS waiting_external,
196            count(*) FILTER (WHERE j.state = 'completed'
197                AND j.finalized_at > now() - interval '1 hour') AS completed_last_hour,
198            EXTRACT(EPOCH FROM (now() - min(j.run_at) FILTER (WHERE j.state = 'available')))::float8 AS lag_seconds,
199            COALESCE(qm.paused, FALSE) AS paused
200        FROM awa.jobs j
201        LEFT JOIN awa.queue_meta qm ON qm.queue = j.queue
202        GROUP BY j.queue, qm.paused
203        "#,
204    )
205    .fetch_all(executor)
206    .await?;
207
208    Ok(rows
209        .into_iter()
210        .map(
211            |(
212                queue,
213                available,
214                running,
215                failed,
216                waiting_external,
217                completed_last_hour,
218                lag_seconds,
219                paused,
220            )| QueueStats {
221                queue,
222                available,
223                running,
224                failed,
225                waiting_external,
226                completed_last_hour,
227                lag_seconds,
228                paused,
229            },
230        )
231        .collect())
232}
233
234/// List jobs with optional filters.
235#[derive(Debug, Clone, Default, Serialize)]
236pub struct ListJobsFilter {
237    pub state: Option<JobState>,
238    pub kind: Option<String>,
239    pub queue: Option<String>,
240    pub tag: Option<String>,
241    pub before_id: Option<i64>,
242    pub limit: Option<i64>,
243}
244
245/// List jobs matching the given filter.
246pub async fn list_jobs<'e, E>(executor: E, filter: &ListJobsFilter) -> Result<Vec<JobRow>, AwaError>
247where
248    E: PgExecutor<'e>,
249{
250    let limit = filter.limit.unwrap_or(100);
251
252    let rows = sqlx::query_as::<_, JobRow>(
253        r#"
254        SELECT * FROM awa.jobs
255        WHERE ($1::awa.job_state IS NULL OR state = $1)
256          AND ($2::text IS NULL OR kind = $2)
257          AND ($3::text IS NULL OR queue = $3)
258          AND ($4::text IS NULL OR tags @> ARRAY[$4]::text[])
259          AND ($5::bigint IS NULL OR id < $5)
260        ORDER BY id DESC
261        LIMIT $6
262        "#,
263    )
264    .bind(filter.state)
265    .bind(&filter.kind)
266    .bind(&filter.queue)
267    .bind(&filter.tag)
268    .bind(filter.before_id)
269    .bind(limit)
270    .fetch_all(executor)
271    .await?;
272
273    Ok(rows)
274}
275
276/// Get a single job by ID.
277pub async fn get_job<'e, E>(executor: E, job_id: i64) -> Result<JobRow, AwaError>
278where
279    E: PgExecutor<'e>,
280{
281    let row = sqlx::query_as::<_, JobRow>("SELECT * FROM awa.jobs WHERE id = $1")
282        .bind(job_id)
283        .fetch_optional(executor)
284        .await?;
285
286    row.ok_or(AwaError::JobNotFound { id: job_id })
287}
288
289/// Count jobs grouped by state.
290pub async fn state_counts<'e, E>(executor: E) -> Result<HashMap<JobState, i64>, AwaError>
291where
292    E: PgExecutor<'e>,
293{
294    let rows =
295        sqlx::query_as::<_, (JobState, i64)>("SELECT state, count(*) FROM awa.jobs GROUP BY state")
296            .fetch_all(executor)
297            .await?;
298
299    Ok(rows.into_iter().collect())
300}
301
302/// Return all distinct job kinds.
303pub async fn distinct_kinds<'e, E>(executor: E) -> Result<Vec<String>, AwaError>
304where
305    E: PgExecutor<'e>,
306{
307    let rows = sqlx::query_scalar::<_, String>("SELECT DISTINCT kind FROM awa.jobs ORDER BY kind")
308        .fetch_all(executor)
309        .await?;
310
311    Ok(rows)
312}
313
314/// Return all distinct queue names.
315pub async fn distinct_queues<'e, E>(executor: E) -> Result<Vec<String>, AwaError>
316where
317    E: PgExecutor<'e>,
318{
319    let rows =
320        sqlx::query_scalar::<_, String>("SELECT DISTINCT queue FROM awa.jobs ORDER BY queue")
321            .fetch_all(executor)
322            .await?;
323
324    Ok(rows)
325}
326
327/// Retry multiple jobs by ID. Only retries failed, cancelled, or waiting_external jobs.
328pub async fn bulk_retry<'e, E>(executor: E, ids: &[i64]) -> Result<Vec<JobRow>, AwaError>
329where
330    E: PgExecutor<'e>,
331{
332    let rows = sqlx::query_as::<_, JobRow>(
333        r#"
334        UPDATE awa.jobs
335        SET state = 'available', attempt = 0, run_at = now(),
336            finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL,
337            callback_id = NULL, callback_timeout_at = NULL,
338            callback_filter = NULL, callback_on_complete = NULL,
339            callback_on_fail = NULL, callback_transform = NULL
340        WHERE id = ANY($1) AND state IN ('failed', 'cancelled', 'waiting_external')
341        RETURNING *
342        "#,
343    )
344    .bind(ids)
345    .fetch_all(executor)
346    .await?;
347
348    Ok(rows)
349}
350
351/// Cancel multiple jobs by ID. Only cancels non-terminal jobs.
352pub async fn bulk_cancel<'e, E>(executor: E, ids: &[i64]) -> Result<Vec<JobRow>, AwaError>
353where
354    E: PgExecutor<'e>,
355{
356    let rows = sqlx::query_as::<_, JobRow>(
357        r#"
358        UPDATE awa.jobs
359        SET state = 'cancelled', finalized_at = now(),
360            callback_id = NULL, callback_timeout_at = NULL,
361            callback_filter = NULL, callback_on_complete = NULL,
362            callback_on_fail = NULL, callback_transform = NULL
363        WHERE id = ANY($1) AND state NOT IN ('completed', 'failed', 'cancelled')
364        RETURNING *
365        "#,
366    )
367    .bind(ids)
368    .fetch_all(executor)
369    .await?;
370
371    Ok(rows)
372}
373
374/// A bucketed count of jobs by state over time.
375#[derive(Debug, Clone, Serialize)]
376pub struct StateTimeseriesBucket {
377    pub bucket: chrono::DateTime<chrono::Utc>,
378    pub state: JobState,
379    pub count: i64,
380}
381
382/// Return time-bucketed state counts over the last N minutes.
383pub async fn state_timeseries<'e, E>(
384    executor: E,
385    minutes: i32,
386) -> Result<Vec<StateTimeseriesBucket>, AwaError>
387where
388    E: PgExecutor<'e>,
389{
390    let rows = sqlx::query_as::<_, (chrono::DateTime<chrono::Utc>, JobState, i64)>(
391        r#"
392        SELECT
393            date_trunc('minute', created_at) AS bucket,
394            state,
395            count(*) AS count
396        FROM awa.jobs
397        WHERE created_at >= now() - make_interval(mins => $1)
398        GROUP BY bucket, state
399        ORDER BY bucket
400        "#,
401    )
402    .bind(minutes)
403    .fetch_all(executor)
404    .await?;
405
406    Ok(rows
407        .into_iter()
408        .map(|(bucket, state, count)| StateTimeseriesBucket {
409            bucket,
410            state,
411            count,
412        })
413        .collect())
414}
415
416/// Register a callback for a running job, writing the callback_id and timeout
417/// to the database immediately.
418///
419/// Call this BEFORE sending the callback_id to the external system to avoid
420/// the race condition where the external system fires before the DB knows
421/// about the callback.
422///
423/// Returns the generated callback UUID on success.
424pub async fn register_callback<'e, E>(
425    executor: E,
426    job_id: i64,
427    run_lease: i64,
428    timeout: std::time::Duration,
429) -> Result<Uuid, AwaError>
430where
431    E: PgExecutor<'e>,
432{
433    let callback_id = Uuid::new_v4();
434    let timeout_secs = timeout.as_secs_f64();
435    let result = sqlx::query(
436        r#"UPDATE awa.jobs
437           SET callback_id = $2,
438               callback_timeout_at = now() + make_interval(secs => $3),
439               callback_filter = NULL,
440               callback_on_complete = NULL,
441               callback_on_fail = NULL,
442               callback_transform = NULL
443           WHERE id = $1 AND state = 'running' AND run_lease = $4"#,
444    )
445    .bind(job_id)
446    .bind(callback_id)
447    .bind(timeout_secs)
448    .bind(run_lease)
449    .execute(executor)
450    .await?;
451    if result.rows_affected() == 0 {
452        return Err(AwaError::Validation("job is not in running state".into()));
453    }
454    Ok(callback_id)
455}
456
457/// Complete a waiting job via external callback.
458///
459/// Accepts jobs in `waiting_external` or `running` state (race handling: the
460/// external system may fire before the executor transitions to `waiting_external`).
461///
462/// The `payload` parameter is accepted but not stored on the job — it exists
463/// for callers who want to pass callback data through and will be used by the
464/// planned CEL expression filtering/transforms feature. Callers can process
465/// the payload immediately or enqueue a follow-up job with it.
466pub async fn complete_external<'e, E>(
467    executor: E,
468    callback_id: Uuid,
469    _payload: Option<serde_json::Value>,
470) -> Result<JobRow, AwaError>
471where
472    E: PgExecutor<'e>,
473{
474    let row = sqlx::query_as::<_, JobRow>(
475        r#"
476        UPDATE awa.jobs
477        SET state = 'completed',
478            finalized_at = now(),
479            callback_id = NULL,
480            callback_timeout_at = NULL,
481            callback_filter = NULL,
482            callback_on_complete = NULL,
483            callback_on_fail = NULL,
484            callback_transform = NULL,
485            heartbeat_at = NULL,
486            deadline_at = NULL,
487            progress = NULL
488        WHERE callback_id = $1 AND state IN ('waiting_external', 'running')
489        RETURNING *
490        "#,
491    )
492    .bind(callback_id)
493    .fetch_optional(executor)
494    .await?;
495
496    row.ok_or(AwaError::CallbackNotFound {
497        callback_id: callback_id.to_string(),
498    })
499}
500
501/// Fail a waiting job via external callback.
502///
503/// Records the error and transitions to `failed`.
504pub async fn fail_external<'e, E>(
505    executor: E,
506    callback_id: Uuid,
507    error: &str,
508) -> Result<JobRow, AwaError>
509where
510    E: PgExecutor<'e>,
511{
512    let row = sqlx::query_as::<_, JobRow>(
513        r#"
514        UPDATE awa.jobs
515        SET state = 'failed',
516            finalized_at = now(),
517            callback_id = NULL,
518            callback_timeout_at = NULL,
519            callback_filter = NULL,
520            callback_on_complete = NULL,
521            callback_on_fail = NULL,
522            callback_transform = NULL,
523            heartbeat_at = NULL,
524            deadline_at = NULL,
525            errors = errors || jsonb_build_object(
526                'error', $2::text,
527                'attempt', attempt,
528                'at', now()
529            )::jsonb
530        WHERE callback_id = $1 AND state IN ('waiting_external', 'running')
531        RETURNING *
532        "#,
533    )
534    .bind(callback_id)
535    .bind(error)
536    .fetch_optional(executor)
537    .await?;
538
539    row.ok_or(AwaError::CallbackNotFound {
540        callback_id: callback_id.to_string(),
541    })
542}
543
544/// Retry a waiting job via external callback.
545///
546/// Resets to `available` with attempt = 0. The handler must be idempotent
547/// with respect to the external call — a retry re-executes from scratch.
548///
549/// Only accepts `waiting_external` state — unlike complete/fail which are
550/// terminal transitions, retry puts the job back to `available`. Allowing
551/// retry from `running` would risk concurrent dispatch if the original
552/// handler hasn't finished yet.
553pub async fn retry_external<'e, E>(executor: E, callback_id: Uuid) -> Result<JobRow, AwaError>
554where
555    E: PgExecutor<'e>,
556{
557    let row = sqlx::query_as::<_, JobRow>(
558        r#"
559        UPDATE awa.jobs
560        SET state = 'available',
561            attempt = 0,
562            run_at = now(),
563            finalized_at = NULL,
564            callback_id = NULL,
565            callback_timeout_at = NULL,
566            callback_filter = NULL,
567            callback_on_complete = NULL,
568            callback_on_fail = NULL,
569            callback_transform = NULL,
570            heartbeat_at = NULL,
571            deadline_at = NULL
572        WHERE callback_id = $1 AND state = 'waiting_external'
573        RETURNING *
574        "#,
575    )
576    .bind(callback_id)
577    .fetch_optional(executor)
578    .await?;
579
580    row.ok_or(AwaError::CallbackNotFound {
581        callback_id: callback_id.to_string(),
582    })
583}
584
585// ── CEL callback expressions ──────────────────────────────────────────
586
587/// Configuration for CEL callback expressions.
588///
589/// All fields are optional. When all are `None`, behaviour is identical to
590/// the original `register_callback` (no expression evaluation).
591#[derive(Debug, Clone, Default)]
592pub struct CallbackConfig {
593    /// Gate: should this payload be processed at all? Returns bool.
594    pub filter: Option<String>,
595    /// Does this payload indicate success? Returns bool.
596    pub on_complete: Option<String>,
597    /// Does this payload indicate failure? Returns bool. Evaluated before on_complete.
598    pub on_fail: Option<String>,
599    /// Reshape payload before returning to caller. Returns any Value.
600    pub transform: Option<String>,
601}
602
603impl CallbackConfig {
604    /// Returns true if no expressions are configured.
605    pub fn is_empty(&self) -> bool {
606        self.filter.is_none()
607            && self.on_complete.is_none()
608            && self.on_fail.is_none()
609            && self.transform.is_none()
610    }
611}
612
613/// What `resolve_callback` should do if no CEL conditions match or no
614/// expressions are configured.
615#[derive(Debug, Clone, Copy, PartialEq, Eq)]
616pub enum DefaultAction {
617    Complete,
618    Fail,
619    Ignore,
620}
621
622/// Outcome of `resolve_callback`.
623#[derive(Debug)]
624pub enum ResolveOutcome {
625    Completed {
626        payload: Option<serde_json::Value>,
627        job: JobRow,
628    },
629    Failed {
630        job: JobRow,
631    },
632    Ignored {
633        reason: String,
634    },
635}
636
637impl ResolveOutcome {
638    pub fn is_completed(&self) -> bool {
639        matches!(self, ResolveOutcome::Completed { .. })
640    }
641    pub fn is_failed(&self) -> bool {
642        matches!(self, ResolveOutcome::Failed { .. })
643    }
644    pub fn is_ignored(&self) -> bool {
645        matches!(self, ResolveOutcome::Ignored { .. })
646    }
647}
648
649/// Register a callback with optional CEL expressions.
650///
651/// When expressions are provided and the `cel` feature is enabled, each
652/// expression is trial-compiled at registration time so syntax errors are
653/// caught early.
654///
655/// When the `cel` feature is disabled and any expression is non-None,
656/// returns `AwaError::Validation`.
657pub async fn register_callback_with_config<'e, E>(
658    executor: E,
659    job_id: i64,
660    run_lease: i64,
661    timeout: std::time::Duration,
662    config: &CallbackConfig,
663) -> Result<Uuid, AwaError>
664where
665    E: PgExecutor<'e>,
666{
667    // Validate CEL expressions at registration time: compile + check references
668    #[cfg(feature = "cel")]
669    {
670        for (name, expr) in [
671            ("filter", &config.filter),
672            ("on_complete", &config.on_complete),
673            ("on_fail", &config.on_fail),
674            ("transform", &config.transform),
675        ] {
676            if let Some(src) = expr {
677                let program = cel::Program::compile(src).map_err(|e| {
678                    AwaError::Validation(format!("invalid CEL expression for {name}: {e}"))
679                })?;
680
681                // Reject undeclared variables — CEL only reports these at execution
682                // time, so an expression like `missing == 1` would parse fine but
683                // silently fall into the fail-open path at resolve time.
684                let refs = program.references();
685                let bad_vars: Vec<&str> = refs
686                    .variables()
687                    .into_iter()
688                    .filter(|v| *v != "payload")
689                    .collect();
690                if !bad_vars.is_empty() {
691                    return Err(AwaError::Validation(format!(
692                        "CEL expression for {name} references undeclared variable(s): {}; \
693                         only 'payload' is available",
694                        bad_vars.join(", ")
695                    )));
696                }
697            }
698        }
699    }
700
701    #[cfg(not(feature = "cel"))]
702    {
703        if !config.is_empty() {
704            return Err(AwaError::Validation(
705                "CEL expressions require the 'cel' feature".into(),
706            ));
707        }
708    }
709
710    let callback_id = Uuid::new_v4();
711    let timeout_secs = timeout.as_secs_f64();
712
713    let result = sqlx::query(
714        r#"UPDATE awa.jobs
715           SET callback_id = $2,
716               callback_timeout_at = now() + make_interval(secs => $3),
717               callback_filter = $4,
718               callback_on_complete = $5,
719               callback_on_fail = $6,
720               callback_transform = $7
721           WHERE id = $1 AND state = 'running' AND run_lease = $8"#,
722    )
723    .bind(job_id)
724    .bind(callback_id)
725    .bind(timeout_secs)
726    .bind(&config.filter)
727    .bind(&config.on_complete)
728    .bind(&config.on_fail)
729    .bind(&config.transform)
730    .bind(run_lease)
731    .execute(executor)
732    .await?;
733
734    if result.rows_affected() == 0 {
735        return Err(AwaError::Validation("job is not in running state".into()));
736    }
737    Ok(callback_id)
738}
739
740/// Internal action decided by CEL evaluation or default.
741enum ResolveAction {
742    Complete(Option<serde_json::Value>),
743    Fail {
744        error: String,
745        expression: Option<String>,
746    },
747    Ignore(String),
748}
749
750/// Resolve a callback by evaluating CEL expressions against the payload.
751///
752/// Uses a transaction with `SELECT ... FOR UPDATE` for atomicity.
753/// The `default_action` determines behaviour when no CEL conditions match
754/// or no expressions are configured.
755pub async fn resolve_callback(
756    pool: &PgPool,
757    callback_id: Uuid,
758    payload: Option<serde_json::Value>,
759    default_action: DefaultAction,
760) -> Result<ResolveOutcome, AwaError> {
761    let mut tx = pool.begin().await?;
762
763    // Query jobs_hot directly (not the awa.jobs UNION ALL view) because
764    // FOR UPDATE is not reliably supported on UNION views. Waiting_external
765    // jobs are always in jobs_hot (the check constraint on scheduled_jobs
766    // only allows scheduled/retryable).
767    let job = sqlx::query_as::<_, JobRow>(
768        "SELECT * FROM awa.jobs_hot WHERE callback_id = $1
769         AND state = 'waiting_external'
770         FOR UPDATE",
771    )
772    .bind(callback_id)
773    .fetch_optional(&mut *tx)
774    .await?
775    .ok_or(AwaError::CallbackNotFound {
776        callback_id: callback_id.to_string(),
777    })?;
778
779    let action = evaluate_or_default(&job, &payload, default_action)?;
780
781    match action {
782        ResolveAction::Complete(transformed_payload) => {
783            let completed_job = sqlx::query_as::<_, JobRow>(
784                r#"
785                UPDATE awa.jobs
786                SET state = 'completed',
787                    finalized_at = now(),
788                    callback_id = NULL,
789                    callback_timeout_at = NULL,
790                    callback_filter = NULL,
791                    callback_on_complete = NULL,
792                    callback_on_fail = NULL,
793                    callback_transform = NULL,
794                    heartbeat_at = NULL,
795                    deadline_at = NULL,
796                    progress = NULL
797                WHERE id = $1
798                RETURNING *
799                "#,
800            )
801            .bind(job.id)
802            .fetch_one(&mut *tx)
803            .await?;
804
805            tx.commit().await?;
806            Ok(ResolveOutcome::Completed {
807                payload: transformed_payload,
808                job: completed_job,
809            })
810        }
811        ResolveAction::Fail { error, expression } => {
812            let mut error_json = serde_json::json!({
813                "error": error,
814                "attempt": job.attempt,
815                "at": chrono::Utc::now().to_rfc3339(),
816            });
817            if let Some(expr) = expression {
818                error_json["expression"] = serde_json::Value::String(expr);
819            }
820
821            let failed_job = sqlx::query_as::<_, JobRow>(
822                r#"
823                UPDATE awa.jobs
824                SET state = 'failed',
825                    finalized_at = now(),
826                    callback_id = NULL,
827                    callback_timeout_at = NULL,
828                    callback_filter = NULL,
829                    callback_on_complete = NULL,
830                    callback_on_fail = NULL,
831                    callback_transform = NULL,
832                    heartbeat_at = NULL,
833                    deadline_at = NULL,
834                    errors = errors || $2::jsonb
835                WHERE id = $1
836                RETURNING *
837                "#,
838            )
839            .bind(job.id)
840            .bind(error_json)
841            .fetch_one(&mut *tx)
842            .await?;
843
844            tx.commit().await?;
845            Ok(ResolveOutcome::Failed { job: failed_job })
846        }
847        ResolveAction::Ignore(reason) => {
848            // No state change — dropping tx releases FOR UPDATE lock
849            Ok(ResolveOutcome::Ignored { reason })
850        }
851    }
852}
853
854/// Evaluate CEL expressions or fall through to default_action.
855fn evaluate_or_default(
856    job: &JobRow,
857    payload: &Option<serde_json::Value>,
858    default_action: DefaultAction,
859) -> Result<ResolveAction, AwaError> {
860    let has_expressions = job.callback_filter.is_some()
861        || job.callback_on_complete.is_some()
862        || job.callback_on_fail.is_some()
863        || job.callback_transform.is_some();
864
865    if !has_expressions {
866        return Ok(apply_default(default_action, payload));
867    }
868
869    #[cfg(feature = "cel")]
870    {
871        Ok(evaluate_cel(job, payload, default_action))
872    }
873
874    #[cfg(not(feature = "cel"))]
875    {
876        // Expressions are present but CEL feature is not enabled.
877        // Return an error without mutating the job — it stays in waiting_external.
878        let _ = (payload, default_action);
879        Err(AwaError::Validation(
880            "CEL expressions present but 'cel' feature is not enabled".into(),
881        ))
882    }
883}
884
885fn apply_default(
886    default_action: DefaultAction,
887    payload: &Option<serde_json::Value>,
888) -> ResolveAction {
889    match default_action {
890        DefaultAction::Complete => ResolveAction::Complete(payload.clone()),
891        DefaultAction::Fail => ResolveAction::Fail {
892            error: "callback failed: default action".to_string(),
893            expression: None,
894        },
895        DefaultAction::Ignore => {
896            ResolveAction::Ignore("no expressions configured, default is ignore".to_string())
897        }
898    }
899}
900
901#[cfg(feature = "cel")]
902fn evaluate_cel(
903    job: &JobRow,
904    payload: &Option<serde_json::Value>,
905    default_action: DefaultAction,
906) -> ResolveAction {
907    let payload_value = payload.as_ref().cloned().unwrap_or(serde_json::Value::Null);
908
909    // 1. Evaluate filter
910    if let Some(filter_expr) = &job.callback_filter {
911        match eval_bool(filter_expr, &payload_value, job.id, "filter") {
912            Ok(true) => {} // pass through
913            Ok(false) => {
914                return ResolveAction::Ignore("filter expression returned false".to_string());
915            }
916            Err(_) => {
917                // Fail-open: treat filter error as true (pass through)
918            }
919        }
920    }
921
922    // 2. Evaluate on_fail (before on_complete — fail takes precedence)
923    if let Some(on_fail_expr) = &job.callback_on_fail {
924        match eval_bool(on_fail_expr, &payload_value, job.id, "on_fail") {
925            Ok(true) => {
926                return ResolveAction::Fail {
927                    error: "callback failed: on_fail expression matched".to_string(),
928                    expression: Some(on_fail_expr.clone()),
929                };
930            }
931            Ok(false) => {} // don't fail
932            Err(_) => {
933                // Fail-open: treat on_fail error as false (don't fail)
934            }
935        }
936    }
937
938    // 3. Evaluate on_complete
939    if let Some(on_complete_expr) = &job.callback_on_complete {
940        match eval_bool(on_complete_expr, &payload_value, job.id, "on_complete") {
941            Ok(true) => {
942                // Complete with optional transform
943                let transformed = apply_transform(job, &payload_value);
944                return ResolveAction::Complete(Some(transformed));
945            }
946            Ok(false) => {} // don't complete
947            Err(_) => {
948                // Fail-open: treat on_complete error as false (don't complete)
949            }
950        }
951    }
952
953    // 4. Neither condition matched → apply default_action
954    apply_default(default_action, payload)
955}
956
957#[cfg(feature = "cel")]
958fn eval_bool(
959    expression: &str,
960    payload_value: &serde_json::Value,
961    job_id: i64,
962    expression_name: &str,
963) -> Result<bool, ()> {
964    let program = match cel::Program::compile(expression) {
965        Ok(p) => p,
966        Err(e) => {
967            tracing::warn!(
968                job_id,
969                expression_name,
970                expression,
971                error = %e,
972                "CEL compilation error during evaluation"
973            );
974            return Err(());
975        }
976    };
977
978    let mut context = cel::Context::default();
979    if let Err(e) = context.add_variable("payload", payload_value.clone()) {
980        tracing::warn!(
981            job_id,
982            expression_name,
983            error = %e,
984            "Failed to add payload variable to CEL context"
985        );
986        return Err(());
987    }
988
989    match program.execute(&context) {
990        Ok(cel::Value::Bool(b)) => Ok(b),
991        Ok(other) => {
992            tracing::warn!(
993                job_id,
994                expression_name,
995                expression,
996                result_type = ?other.type_of(),
997                "CEL expression returned non-bool"
998            );
999            Err(())
1000        }
1001        Err(e) => {
1002            tracing::warn!(
1003                job_id,
1004                expression_name,
1005                expression,
1006                error = %e,
1007                "CEL execution error"
1008            );
1009            Err(())
1010        }
1011    }
1012}
1013
1014#[cfg(feature = "cel")]
1015fn apply_transform(job: &JobRow, payload_value: &serde_json::Value) -> serde_json::Value {
1016    let transform_expr = match &job.callback_transform {
1017        Some(expr) => expr,
1018        None => return payload_value.clone(),
1019    };
1020
1021    let program = match cel::Program::compile(transform_expr) {
1022        Ok(p) => p,
1023        Err(e) => {
1024            tracing::warn!(
1025                job_id = job.id,
1026                expression = transform_expr,
1027                error = %e,
1028                "CEL transform compilation error, using original payload"
1029            );
1030            return payload_value.clone();
1031        }
1032    };
1033
1034    let mut context = cel::Context::default();
1035    if let Err(e) = context.add_variable("payload", payload_value.clone()) {
1036        tracing::warn!(
1037            job_id = job.id,
1038            error = %e,
1039            "Failed to add payload variable for transform"
1040        );
1041        return payload_value.clone();
1042    }
1043
1044    match program.execute(&context) {
1045        Ok(value) => match value.json() {
1046            Ok(json) => json,
1047            Err(e) => {
1048                tracing::warn!(
1049                    job_id = job.id,
1050                    expression = transform_expr,
1051                    error = %e,
1052                    "CEL transform result could not be converted to JSON, using original payload"
1053                );
1054                payload_value.clone()
1055            }
1056        },
1057        Err(e) => {
1058            tracing::warn!(
1059                job_id = job.id,
1060                expression = transform_expr,
1061                error = %e,
1062                "CEL transform execution error, using original payload"
1063            );
1064            payload_value.clone()
1065        }
1066    }
1067}