Skip to main content

awa_model/
admin.rs

1use crate::error::AwaError;
2use crate::job::{JobRow, JobState};
3use chrono::{DateTime, Duration, Utc};
4use serde::{Deserialize, Serialize};
5use sqlx::types::Json;
6use sqlx::PgExecutor;
7use sqlx::PgPool;
8use std::cmp::max;
9use std::collections::HashMap;
10use uuid::Uuid;
11
12/// Retry a single failed, cancelled, or waiting_external job.
13pub async fn retry<'e, E>(executor: E, job_id: i64) -> Result<Option<JobRow>, AwaError>
14where
15    E: PgExecutor<'e>,
16{
17    sqlx::query_as::<_, JobRow>(
18        r#"
19        UPDATE awa.jobs
20        SET state = 'available', attempt = 0, run_at = now(),
21            finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL,
22            callback_id = NULL, callback_timeout_at = NULL,
23            callback_filter = NULL, callback_on_complete = NULL,
24            callback_on_fail = NULL, callback_transform = NULL
25        WHERE id = $1 AND state IN ('failed', 'cancelled', 'waiting_external')
26        RETURNING *
27        "#,
28    )
29    .bind(job_id)
30    .fetch_optional(executor)
31    .await?
32    .ok_or(AwaError::JobNotFound { id: job_id })
33    .map(Some)
34}
35
36/// Cancel a single non-terminal job.
37pub async fn cancel<'e, E>(executor: E, job_id: i64) -> Result<Option<JobRow>, AwaError>
38where
39    E: PgExecutor<'e>,
40{
41    sqlx::query_as::<_, JobRow>(
42        r#"
43        UPDATE awa.jobs
44        SET state = 'cancelled', finalized_at = now(),
45            callback_id = NULL, callback_timeout_at = NULL,
46            callback_filter = NULL, callback_on_complete = NULL,
47            callback_on_fail = NULL, callback_transform = NULL
48        WHERE id = $1 AND state NOT IN ('completed', 'failed', 'cancelled')
49        RETURNING *
50        "#,
51    )
52    .bind(job_id)
53    .fetch_optional(executor)
54    .await?
55    .ok_or(AwaError::JobNotFound { id: job_id })
56    .map(Some)
57}
58
59/// Cancel a job by its unique key components.
60///
61/// Reconstructs the BLAKE3 unique key from the same inputs used at insert time
62/// (kind, optional queue, optional args, optional period bucket), then cancels
63/// the single oldest matching non-terminal job. Returns `None` if no matching
64/// job was found (already completed, already cancelled, or never existed).
65///
66/// The parameters must match what was used at insert time: pass `queue` only if
67/// the original `UniqueOpts` had `by_queue: true`, `args` only if `by_args: true`,
68/// and `period_bucket` only if `by_period` was set. Mismatched components produce
69/// a different hash and the job won't be found.
70///
71/// Only one job is cancelled per call (the oldest by `id`). This is intentional:
72/// unique key enforcement uses a state bitmask, so multiple rows with the same
73/// key can legally coexist (e.g., one `waiting_external` + one `available`).
74/// Cancelling all of them in one shot would be surprising.
75///
76/// This is useful when the caller knows the job kind and args but not the job ID —
77/// e.g., cancelling a scheduled reminder when the triggering condition is resolved.
78///
79/// # Implementation notes
80///
81/// Queries `jobs_hot` and `scheduled_jobs` directly rather than the `awa.jobs`
82/// UNION ALL view, because PostgreSQL does not support `FOR UPDATE` on UNION
83/// views. The CTE selects candidate IDs without row locks; blocking on a
84/// concurrently-locked row (e.g., one being processed by a worker) happens
85/// implicitly during the UPDATE phase via the writable view trigger. If the
86/// worker completes the job before the UPDATE acquires the lock, the state
87/// check (`NOT IN ('completed', 'failed', 'cancelled')`) causes the cancel
88/// to no-op and return `None`.
89///
90/// The lookup scans `unique_key` on both physical tables without a dedicated
91/// index. This is acceptable for low-volume use cases. For high-volume tables,
92/// consider adding a partial index on `unique_key WHERE unique_key IS NOT NULL`
93/// or routing through `job_unique_claims` (which is already indexed).
94pub async fn cancel_by_unique_key<'e, E>(
95    executor: E,
96    kind: &str,
97    queue: Option<&str>,
98    args: Option<&serde_json::Value>,
99    period_bucket: Option<i64>,
100) -> Result<Option<JobRow>, AwaError>
101where
102    E: PgExecutor<'e>,
103{
104    let unique_key = crate::unique::compute_unique_key(kind, queue, args, period_bucket);
105
106    // Find the oldest matching job across both physical tables. CTE selects
107    // candidate IDs without row locks; blocking on concurrently-locked rows
108    // happens implicitly during the UPDATE via the writable view trigger.
109    let row = sqlx::query_as::<_, JobRow>(
110        r#"
111        WITH candidates AS (
112            SELECT id FROM awa.jobs_hot
113            WHERE unique_key = $1 AND state NOT IN ('completed', 'failed', 'cancelled')
114            UNION ALL
115            SELECT id FROM awa.scheduled_jobs
116            WHERE unique_key = $1 AND state NOT IN ('completed', 'failed', 'cancelled')
117            ORDER BY id ASC
118            LIMIT 1
119        )
120        UPDATE awa.jobs
121        SET state = 'cancelled', finalized_at = now(),
122            callback_id = NULL, callback_timeout_at = NULL,
123            callback_filter = NULL, callback_on_complete = NULL,
124            callback_on_fail = NULL, callback_transform = NULL
125        FROM candidates
126        WHERE awa.jobs.id = candidates.id
127        RETURNING awa.jobs.*
128        "#,
129    )
130    .bind(&unique_key)
131    .fetch_optional(executor)
132    .await?;
133
134    Ok(row)
135}
136
137/// Retry all failed jobs of a given kind.
138pub async fn retry_failed_by_kind<'e, E>(executor: E, kind: &str) -> Result<Vec<JobRow>, AwaError>
139where
140    E: PgExecutor<'e>,
141{
142    let rows = sqlx::query_as::<_, JobRow>(
143        r#"
144        UPDATE awa.jobs
145        SET state = 'available', attempt = 0, run_at = now(),
146            finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL
147        WHERE kind = $1 AND state = 'failed'
148        RETURNING *
149        "#,
150    )
151    .bind(kind)
152    .fetch_all(executor)
153    .await?;
154
155    Ok(rows)
156}
157
158/// Retry all failed jobs in a given queue.
159pub async fn retry_failed_by_queue<'e, E>(executor: E, queue: &str) -> Result<Vec<JobRow>, AwaError>
160where
161    E: PgExecutor<'e>,
162{
163    let rows = sqlx::query_as::<_, JobRow>(
164        r#"
165        UPDATE awa.jobs
166        SET state = 'available', attempt = 0, run_at = now(),
167            finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL
168        WHERE queue = $1 AND state = 'failed'
169        RETURNING *
170        "#,
171    )
172    .bind(queue)
173    .fetch_all(executor)
174    .await?;
175
176    Ok(rows)
177}
178
179/// Discard (delete) all failed jobs of a given kind.
180pub async fn discard_failed<'e, E>(executor: E, kind: &str) -> Result<u64, AwaError>
181where
182    E: PgExecutor<'e>,
183{
184    let result = sqlx::query("DELETE FROM awa.jobs WHERE kind = $1 AND state = 'failed'")
185        .bind(kind)
186        .execute(executor)
187        .await?;
188
189    Ok(result.rows_affected())
190}
191
192/// Pause a queue. Affects all workers immediately.
193pub async fn pause_queue<'e, E>(
194    executor: E,
195    queue: &str,
196    paused_by: Option<&str>,
197) -> Result<(), AwaError>
198where
199    E: PgExecutor<'e>,
200{
201    sqlx::query(
202        r#"
203        INSERT INTO awa.queue_meta (queue, paused, paused_at, paused_by)
204        VALUES ($1, TRUE, now(), $2)
205        ON CONFLICT (queue) DO UPDATE SET paused = TRUE, paused_at = now(), paused_by = $2
206        "#,
207    )
208    .bind(queue)
209    .bind(paused_by)
210    .execute(executor)
211    .await?;
212
213    Ok(())
214}
215
216/// Resume a paused queue.
217pub async fn resume_queue<'e, E>(executor: E, queue: &str) -> Result<(), AwaError>
218where
219    E: PgExecutor<'e>,
220{
221    sqlx::query("UPDATE awa.queue_meta SET paused = FALSE WHERE queue = $1")
222        .bind(queue)
223        .execute(executor)
224        .await?;
225
226    Ok(())
227}
228
229/// Drain a queue: cancel all non-running, non-terminal jobs.
230pub async fn drain_queue<'e, E>(executor: E, queue: &str) -> Result<u64, AwaError>
231where
232    E: PgExecutor<'e>,
233{
234    let result = sqlx::query(
235        r#"
236        UPDATE awa.jobs
237        SET state = 'cancelled', finalized_at = now(),
238            callback_id = NULL, callback_timeout_at = NULL,
239            callback_filter = NULL, callback_on_complete = NULL,
240            callback_on_fail = NULL, callback_transform = NULL
241        WHERE queue = $1 AND state IN ('available', 'scheduled', 'retryable', 'waiting_external')
242        "#,
243    )
244    .bind(queue)
245    .execute(executor)
246    .await?;
247
248    Ok(result.rows_affected())
249}
250
251/// Queue statistics.
252#[derive(Debug, Clone, Serialize)]
253pub struct QueueStats {
254    pub queue: String,
255    /// All non-terminal jobs for the queue, including running and waiting_external.
256    pub total_queued: i64,
257    pub scheduled: i64,
258    pub available: i64,
259    pub retryable: i64,
260    pub running: i64,
261    pub failed: i64,
262    pub waiting_external: i64,
263    pub completed_last_hour: i64,
264    pub lag_seconds: Option<f64>,
265    pub paused: bool,
266}
267
268/// Snapshot of a per-queue rate limit configuration.
269#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
270pub struct RateLimitSnapshot {
271    pub max_rate: f64,
272    pub burst: u32,
273}
274
275/// Runtime concurrency mode for a queue.
276#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
277#[serde(rename_all = "snake_case")]
278pub enum QueueRuntimeMode {
279    HardReserved,
280    Weighted,
281}
282
283/// Per-queue configuration published by a worker runtime instance.
284#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
285pub struct QueueRuntimeConfigSnapshot {
286    pub mode: QueueRuntimeMode,
287    pub max_workers: Option<u32>,
288    pub min_workers: Option<u32>,
289    pub weight: Option<u32>,
290    pub global_max_workers: Option<u32>,
291    pub poll_interval_ms: u64,
292    pub deadline_duration_secs: u64,
293    pub priority_aging_interval_secs: u64,
294    pub rate_limit: Option<RateLimitSnapshot>,
295}
296
297/// Runtime state for one queue on one worker instance.
298#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
299pub struct QueueRuntimeSnapshot {
300    pub queue: String,
301    pub in_flight: u32,
302    pub overflow_held: Option<u32>,
303    pub config: QueueRuntimeConfigSnapshot,
304}
305
306/// Data written by a worker runtime into the observability snapshot table.
307#[derive(Debug, Clone, Serialize, Deserialize)]
308pub struct RuntimeSnapshotInput {
309    pub instance_id: Uuid,
310    pub hostname: Option<String>,
311    pub pid: i32,
312    pub version: String,
313    pub started_at: DateTime<Utc>,
314    pub snapshot_interval_ms: i64,
315    pub healthy: bool,
316    pub postgres_connected: bool,
317    pub poll_loop_alive: bool,
318    pub heartbeat_alive: bool,
319    pub maintenance_alive: bool,
320    pub shutting_down: bool,
321    pub leader: bool,
322    pub global_max_workers: Option<u32>,
323    pub queues: Vec<QueueRuntimeSnapshot>,
324}
325
326/// A worker runtime instance as exposed through the admin API.
327#[derive(Debug, Clone, Serialize, Deserialize)]
328pub struct RuntimeInstance {
329    pub instance_id: Uuid,
330    pub hostname: Option<String>,
331    pub pid: i32,
332    pub version: String,
333    pub started_at: DateTime<Utc>,
334    pub last_seen_at: DateTime<Utc>,
335    pub snapshot_interval_ms: i64,
336    pub stale: bool,
337    pub healthy: bool,
338    pub postgres_connected: bool,
339    pub poll_loop_alive: bool,
340    pub heartbeat_alive: bool,
341    pub maintenance_alive: bool,
342    pub shutting_down: bool,
343    pub leader: bool,
344    pub global_max_workers: Option<u32>,
345    pub queues: Vec<QueueRuntimeSnapshot>,
346}
347
348impl RuntimeInstance {
349    fn stale_cutoff(interval_ms: i64) -> Duration {
350        let interval_ms = max(interval_ms, 1_000);
351        Duration::milliseconds(max(interval_ms.saturating_mul(3), 30_000))
352    }
353
354    fn from_db_row(row: RuntimeInstanceRow, now: DateTime<Utc>) -> Self {
355        let stale = row.last_seen_at + Self::stale_cutoff(row.snapshot_interval_ms) < now;
356        Self {
357            instance_id: row.instance_id,
358            hostname: row.hostname,
359            pid: row.pid,
360            version: row.version,
361            started_at: row.started_at,
362            last_seen_at: row.last_seen_at,
363            snapshot_interval_ms: row.snapshot_interval_ms,
364            stale,
365            healthy: row.healthy,
366            postgres_connected: row.postgres_connected,
367            poll_loop_alive: row.poll_loop_alive,
368            heartbeat_alive: row.heartbeat_alive,
369            maintenance_alive: row.maintenance_alive,
370            shutting_down: row.shutting_down,
371            leader: row.leader,
372            global_max_workers: row.global_max_workers.map(|v| v as u32),
373            queues: row.queues.0,
374        }
375    }
376}
377
378/// Cluster-wide runtime overview.
379#[derive(Debug, Clone, Serialize, Deserialize)]
380pub struct RuntimeOverview {
381    pub total_instances: usize,
382    pub live_instances: usize,
383    pub stale_instances: usize,
384    pub healthy_instances: usize,
385    pub leader_instances: usize,
386    pub instances: Vec<RuntimeInstance>,
387}
388
389/// Queue-centric runtime/config summary aggregated across worker instances.
390#[derive(Debug, Clone, Serialize, Deserialize)]
391pub struct QueueRuntimeSummary {
392    pub queue: String,
393    pub instance_count: usize,
394    pub live_instances: usize,
395    pub stale_instances: usize,
396    pub healthy_instances: usize,
397    pub total_in_flight: u64,
398    pub overflow_held_total: Option<u64>,
399    pub config_mismatch: bool,
400    pub config: Option<QueueRuntimeConfigSnapshot>,
401}
402
403#[derive(Debug, sqlx::FromRow)]
404struct RuntimeInstanceRow {
405    instance_id: Uuid,
406    hostname: Option<String>,
407    pid: i32,
408    version: String,
409    started_at: DateTime<Utc>,
410    last_seen_at: DateTime<Utc>,
411    snapshot_interval_ms: i64,
412    healthy: bool,
413    postgres_connected: bool,
414    poll_loop_alive: bool,
415    heartbeat_alive: bool,
416    maintenance_alive: bool,
417    shutting_down: bool,
418    leader: bool,
419    global_max_workers: Option<i32>,
420    queues: Json<Vec<QueueRuntimeSnapshot>>,
421}
422
423/// Upsert a runtime observability snapshot for one worker instance.
424pub async fn upsert_runtime_snapshot<'e, E>(
425    executor: E,
426    snapshot: &RuntimeSnapshotInput,
427) -> Result<(), AwaError>
428where
429    E: PgExecutor<'e>,
430{
431    sqlx::query(
432        r#"
433        INSERT INTO awa.runtime_instances (
434            instance_id,
435            hostname,
436            pid,
437            version,
438            started_at,
439            last_seen_at,
440            snapshot_interval_ms,
441            healthy,
442            postgres_connected,
443            poll_loop_alive,
444            heartbeat_alive,
445            maintenance_alive,
446            shutting_down,
447            leader,
448            global_max_workers,
449            queues
450        )
451        VALUES (
452            $1, $2, $3, $4, $5, now(), $6, $7, $8, $9, $10, $11, $12, $13, $14, $15
453        )
454        ON CONFLICT (instance_id) DO UPDATE SET
455            hostname = EXCLUDED.hostname,
456            pid = EXCLUDED.pid,
457            version = EXCLUDED.version,
458            started_at = EXCLUDED.started_at,
459            last_seen_at = now(),
460            snapshot_interval_ms = EXCLUDED.snapshot_interval_ms,
461            healthy = EXCLUDED.healthy,
462            postgres_connected = EXCLUDED.postgres_connected,
463            poll_loop_alive = EXCLUDED.poll_loop_alive,
464            heartbeat_alive = EXCLUDED.heartbeat_alive,
465            maintenance_alive = EXCLUDED.maintenance_alive,
466            shutting_down = EXCLUDED.shutting_down,
467            leader = EXCLUDED.leader,
468            global_max_workers = EXCLUDED.global_max_workers,
469            queues = EXCLUDED.queues
470        "#,
471    )
472    .bind(snapshot.instance_id)
473    .bind(snapshot.hostname.as_deref())
474    .bind(snapshot.pid)
475    .bind(&snapshot.version)
476    .bind(snapshot.started_at)
477    .bind(snapshot.snapshot_interval_ms)
478    .bind(snapshot.healthy)
479    .bind(snapshot.postgres_connected)
480    .bind(snapshot.poll_loop_alive)
481    .bind(snapshot.heartbeat_alive)
482    .bind(snapshot.maintenance_alive)
483    .bind(snapshot.shutting_down)
484    .bind(snapshot.leader)
485    .bind(snapshot.global_max_workers.map(|v| v as i32))
486    .bind(Json(&snapshot.queues))
487    .execute(executor)
488    .await?;
489
490    Ok(())
491}
492
493/// Opportunistically delete long-stale runtime snapshot rows.
494pub async fn cleanup_runtime_snapshots<'e, E>(
495    executor: E,
496    max_age: Duration,
497) -> Result<u64, AwaError>
498where
499    E: PgExecutor<'e>,
500{
501    let seconds = max(max_age.num_seconds(), 1);
502    let result = sqlx::query(
503        "DELETE FROM awa.runtime_instances WHERE last_seen_at < now() - make_interval(secs => $1)",
504    )
505    .bind(seconds)
506    .execute(executor)
507    .await?;
508
509    Ok(result.rows_affected())
510}
511
512/// List all runtime instances ordered with leader/live instances first.
513pub async fn list_runtime_instances<'e, E>(executor: E) -> Result<Vec<RuntimeInstance>, AwaError>
514where
515    E: PgExecutor<'e>,
516{
517    let rows = sqlx::query_as::<_, RuntimeInstanceRow>(
518        r#"
519        SELECT
520            instance_id,
521            hostname,
522            pid,
523            version,
524            started_at,
525            last_seen_at,
526            snapshot_interval_ms,
527            healthy,
528            postgres_connected,
529            poll_loop_alive,
530            heartbeat_alive,
531            maintenance_alive,
532            shutting_down,
533            leader,
534            global_max_workers,
535            queues
536        FROM awa.runtime_instances
537        ORDER BY leader DESC, last_seen_at DESC, started_at DESC
538        "#,
539    )
540    .fetch_all(executor)
541    .await?;
542
543    let now = Utc::now();
544    Ok(rows
545        .into_iter()
546        .map(|row| RuntimeInstance::from_db_row(row, now))
547        .collect())
548}
549
550/// Cluster runtime overview with instance list.
551pub async fn runtime_overview<'e, E>(executor: E) -> Result<RuntimeOverview, AwaError>
552where
553    E: PgExecutor<'e>,
554{
555    let instances = list_runtime_instances(executor).await?;
556    let total_instances = instances.len();
557    let stale_instances = instances.iter().filter(|i| i.stale).count();
558    let live_instances = total_instances.saturating_sub(stale_instances);
559    let healthy_instances = instances.iter().filter(|i| !i.stale && i.healthy).count();
560    let leader_instances = instances.iter().filter(|i| !i.stale && i.leader).count();
561
562    Ok(RuntimeOverview {
563        total_instances,
564        live_instances,
565        stale_instances,
566        healthy_instances,
567        leader_instances,
568        instances,
569    })
570}
571
572/// Queue runtime/config summary aggregated across worker snapshots.
573pub async fn queue_runtime_summary<'e, E>(executor: E) -> Result<Vec<QueueRuntimeSummary>, AwaError>
574where
575    E: PgExecutor<'e>,
576{
577    let instances = list_runtime_instances(executor).await?;
578    let mut by_queue: HashMap<String, Vec<(bool, bool, QueueRuntimeSnapshot)>> = HashMap::new();
579
580    for instance in instances {
581        let is_live = !instance.stale;
582        let is_healthy = is_live && instance.healthy;
583        for queue in instance.queues {
584            by_queue
585                .entry(queue.queue.clone())
586                .or_default()
587                .push((is_live, is_healthy, queue));
588        }
589    }
590
591    let mut summaries: Vec<_> = by_queue
592        .into_iter()
593        .map(|(queue, entries)| {
594            let instance_count = entries.len();
595            let live_instances = entries.iter().filter(|(live, _, _)| *live).count();
596            let stale_instances = instance_count.saturating_sub(live_instances);
597            let healthy_instances = entries.iter().filter(|(_, healthy, _)| *healthy).count();
598            let total_in_flight = entries
599                .iter()
600                .filter(|(live, _, _)| *live)
601                .map(|(_, _, queue)| u64::from(queue.in_flight))
602                .sum();
603
604            let overflow_total: u64 = entries
605                .iter()
606                .filter(|(live, _, _)| *live)
607                .filter_map(|(_, _, queue)| queue.overflow_held.map(u64::from))
608                .sum();
609
610            let live_configs: Vec<_> = entries
611                .iter()
612                .filter(|(live, _, _)| *live)
613                .map(|(_, _, queue)| queue.config.clone())
614                .collect();
615            let config_candidates = if live_configs.is_empty() {
616                entries
617                    .iter()
618                    .map(|(_, _, queue)| queue.config.clone())
619                    .collect::<Vec<_>>()
620            } else {
621                live_configs
622            };
623            let config = config_candidates.first().cloned();
624            let config_mismatch = config_candidates
625                .iter()
626                .skip(1)
627                .any(|candidate| Some(candidate) != config.as_ref());
628
629            QueueRuntimeSummary {
630                queue,
631                instance_count,
632                live_instances,
633                stale_instances,
634                healthy_instances,
635                total_in_flight,
636                overflow_held_total: config
637                    .as_ref()
638                    .filter(|cfg| cfg.mode == QueueRuntimeMode::Weighted)
639                    .map(|_| overflow_total),
640                config_mismatch,
641                config,
642            }
643        })
644        .collect();
645
646    summaries.sort_by(|a, b| a.queue.cmp(&b.queue));
647    Ok(summaries)
648}
649
650/// Get statistics for all queues.
651pub async fn queue_stats<'e, E>(executor: E) -> Result<Vec<QueueStats>, AwaError>
652where
653    E: PgExecutor<'e>,
654{
655    let rows = sqlx::query_as::<
656        _,
657        (
658            String,
659            i64,
660            i64,
661            i64,
662            i64,
663            i64,
664            i64,
665            i64,
666            i64,
667            Option<f64>,
668            bool,
669        ),
670    >(
671        r#"
672        WITH available_lag AS (
673            SELECT
674                queue,
675                EXTRACT(EPOCH FROM (now() - min(run_at)))::float8 AS lag_seconds
676            FROM awa.jobs_hot
677            WHERE state = 'available'
678            GROUP BY queue
679        ),
680        completed_recent AS (
681            SELECT
682                queue,
683                count(*)::bigint AS completed_last_hour
684            FROM awa.jobs_hot
685            WHERE state = 'completed'
686              AND finalized_at > now() - interval '1 hour'
687            GROUP BY queue
688        )
689        SELECT
690            qs.queue,
691            qs.scheduled + qs.available + qs.running + qs.retryable + qs.waiting_external AS total_queued,
692            qs.scheduled,
693            qs.available,
694            qs.retryable,
695            qs.running,
696            qs.failed,
697            qs.waiting_external,
698            COALESCE(cr.completed_last_hour, 0) AS completed_last_hour,
699            al.lag_seconds,
700            COALESCE(qm.paused, FALSE) AS paused
701        FROM awa.queue_state_counts qs
702        LEFT JOIN available_lag al ON al.queue = qs.queue
703        LEFT JOIN completed_recent cr ON cr.queue = qs.queue
704        LEFT JOIN awa.queue_meta qm ON qm.queue = qs.queue
705        ORDER BY qs.queue
706        "#,
707    )
708    .fetch_all(executor)
709    .await?;
710
711    Ok(rows
712        .into_iter()
713        .map(
714            |(
715                queue,
716                total_queued,
717                scheduled,
718                available,
719                retryable,
720                running,
721                failed,
722                waiting_external,
723                completed_last_hour,
724                lag_seconds,
725                paused,
726            )| QueueStats {
727                queue,
728                total_queued,
729                scheduled,
730                available,
731                retryable,
732                running,
733                failed,
734                waiting_external,
735                completed_last_hour,
736                lag_seconds,
737                paused,
738            },
739        )
740        .collect())
741}
742
743/// List jobs with optional filters.
744#[derive(Debug, Clone, Default, Serialize)]
745pub struct ListJobsFilter {
746    pub state: Option<JobState>,
747    pub kind: Option<String>,
748    pub queue: Option<String>,
749    pub tag: Option<String>,
750    pub before_id: Option<i64>,
751    pub limit: Option<i64>,
752}
753
754/// List jobs matching the given filter.
755pub async fn list_jobs<'e, E>(executor: E, filter: &ListJobsFilter) -> Result<Vec<JobRow>, AwaError>
756where
757    E: PgExecutor<'e>,
758{
759    let limit = filter.limit.unwrap_or(100);
760
761    let rows = sqlx::query_as::<_, JobRow>(
762        r#"
763        SELECT * FROM awa.jobs
764        WHERE ($1::awa.job_state IS NULL OR state = $1)
765          AND ($2::text IS NULL OR kind = $2)
766          AND ($3::text IS NULL OR queue = $3)
767          AND ($4::text IS NULL OR tags @> ARRAY[$4]::text[])
768          AND ($5::bigint IS NULL OR id < $5)
769        ORDER BY id DESC
770        LIMIT $6
771        "#,
772    )
773    .bind(filter.state)
774    .bind(&filter.kind)
775    .bind(&filter.queue)
776    .bind(&filter.tag)
777    .bind(filter.before_id)
778    .bind(limit)
779    .fetch_all(executor)
780    .await?;
781
782    Ok(rows)
783}
784
785/// Get a single job by ID.
786pub async fn get_job<'e, E>(executor: E, job_id: i64) -> Result<JobRow, AwaError>
787where
788    E: PgExecutor<'e>,
789{
790    let row = sqlx::query_as::<_, JobRow>("SELECT * FROM awa.jobs WHERE id = $1")
791        .bind(job_id)
792        .fetch_optional(executor)
793        .await?;
794
795    row.ok_or(AwaError::JobNotFound { id: job_id })
796}
797
798/// Count jobs grouped by state.
799pub async fn state_counts<'e, E>(executor: E) -> Result<HashMap<JobState, i64>, AwaError>
800where
801    E: PgExecutor<'e>,
802{
803    let rows = sqlx::query_as::<_, (JobState, i64)>(
804        r#"
805        SELECT 'scheduled'::awa.job_state, COALESCE(sum(scheduled), 0)::bigint FROM awa.queue_state_counts
806        UNION ALL
807        SELECT 'available'::awa.job_state, COALESCE(sum(available), 0)::bigint FROM awa.queue_state_counts
808        UNION ALL
809        SELECT 'running'::awa.job_state, COALESCE(sum(running), 0)::bigint FROM awa.queue_state_counts
810        UNION ALL
811        SELECT 'completed'::awa.job_state, COALESCE(sum(completed), 0)::bigint FROM awa.queue_state_counts
812        UNION ALL
813        SELECT 'retryable'::awa.job_state, COALESCE(sum(retryable), 0)::bigint FROM awa.queue_state_counts
814        UNION ALL
815        SELECT 'failed'::awa.job_state, COALESCE(sum(failed), 0)::bigint FROM awa.queue_state_counts
816        UNION ALL
817        SELECT 'cancelled'::awa.job_state, COALESCE(sum(cancelled), 0)::bigint FROM awa.queue_state_counts
818        UNION ALL
819        SELECT 'waiting_external'::awa.job_state, COALESCE(sum(waiting_external), 0)::bigint FROM awa.queue_state_counts
820        "#,
821    )
822    .fetch_all(executor)
823    .await?;
824
825    Ok(rows.into_iter().collect())
826}
827
828/// Return all distinct job kinds.
829pub async fn distinct_kinds<'e, E>(executor: E) -> Result<Vec<String>, AwaError>
830where
831    E: PgExecutor<'e>,
832{
833    let rows = sqlx::query_scalar::<_, String>(
834        "SELECT kind FROM awa.job_kind_catalog WHERE ref_count > 0 ORDER BY kind",
835    )
836    .fetch_all(executor)
837    .await?;
838
839    Ok(rows)
840}
841
842/// Return all distinct queue names.
843pub async fn distinct_queues<'e, E>(executor: E) -> Result<Vec<String>, AwaError>
844where
845    E: PgExecutor<'e>,
846{
847    let rows = sqlx::query_scalar::<_, String>(
848        "SELECT queue FROM awa.job_queue_catalog WHERE ref_count > 0 ORDER BY queue",
849    )
850    .fetch_all(executor)
851    .await?;
852
853    Ok(rows)
854}
855
856/// Retry multiple jobs by ID. Only retries failed, cancelled, or waiting_external jobs.
857pub async fn bulk_retry<'e, E>(executor: E, ids: &[i64]) -> Result<Vec<JobRow>, AwaError>
858where
859    E: PgExecutor<'e>,
860{
861    let rows = sqlx::query_as::<_, JobRow>(
862        r#"
863        UPDATE awa.jobs
864        SET state = 'available', attempt = 0, run_at = now(),
865            finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL,
866            callback_id = NULL, callback_timeout_at = NULL,
867            callback_filter = NULL, callback_on_complete = NULL,
868            callback_on_fail = NULL, callback_transform = NULL
869        WHERE id = ANY($1) AND state IN ('failed', 'cancelled', 'waiting_external')
870        RETURNING *
871        "#,
872    )
873    .bind(ids)
874    .fetch_all(executor)
875    .await?;
876
877    Ok(rows)
878}
879
880/// Cancel multiple jobs by ID. Only cancels non-terminal jobs.
881pub async fn bulk_cancel<'e, E>(executor: E, ids: &[i64]) -> Result<Vec<JobRow>, AwaError>
882where
883    E: PgExecutor<'e>,
884{
885    let rows = sqlx::query_as::<_, JobRow>(
886        r#"
887        UPDATE awa.jobs
888        SET state = 'cancelled', finalized_at = now(),
889            callback_id = NULL, callback_timeout_at = NULL,
890            callback_filter = NULL, callback_on_complete = NULL,
891            callback_on_fail = NULL, callback_transform = NULL
892        WHERE id = ANY($1) AND state NOT IN ('completed', 'failed', 'cancelled')
893        RETURNING *
894        "#,
895    )
896    .bind(ids)
897    .fetch_all(executor)
898    .await?;
899
900    Ok(rows)
901}
902
903/// A bucketed count of jobs by state over time.
904#[derive(Debug, Clone, Serialize)]
905pub struct StateTimeseriesBucket {
906    pub bucket: chrono::DateTime<chrono::Utc>,
907    pub state: JobState,
908    pub count: i64,
909}
910
911/// Return time-bucketed state counts over the last N minutes.
912pub async fn state_timeseries<'e, E>(
913    executor: E,
914    minutes: i32,
915) -> Result<Vec<StateTimeseriesBucket>, AwaError>
916where
917    E: PgExecutor<'e>,
918{
919    let rows = sqlx::query_as::<_, (chrono::DateTime<chrono::Utc>, JobState, i64)>(
920        r#"
921        SELECT
922            date_trunc('minute', created_at) AS bucket,
923            state,
924            count(*) AS count
925        FROM awa.jobs
926        WHERE created_at >= now() - make_interval(mins => $1)
927        GROUP BY bucket, state
928        ORDER BY bucket
929        "#,
930    )
931    .bind(minutes)
932    .fetch_all(executor)
933    .await?;
934
935    Ok(rows
936        .into_iter()
937        .map(|(bucket, state, count)| StateTimeseriesBucket {
938            bucket,
939            state,
940            count,
941        })
942        .collect())
943}
944
945/// Register a callback for a running job, writing the callback_id and timeout
946/// to the database immediately.
947///
948/// Call this BEFORE sending the callback_id to the external system to avoid
949/// the race condition where the external system fires before the DB knows
950/// about the callback.
951///
952/// Returns the generated callback UUID on success.
953pub async fn register_callback<'e, E>(
954    executor: E,
955    job_id: i64,
956    run_lease: i64,
957    timeout: std::time::Duration,
958) -> Result<Uuid, AwaError>
959where
960    E: PgExecutor<'e>,
961{
962    let callback_id = Uuid::new_v4();
963    let timeout_secs = timeout.as_secs_f64();
964    let result = sqlx::query(
965        r#"UPDATE awa.jobs
966           SET callback_id = $2,
967               callback_timeout_at = now() + make_interval(secs => $3),
968               callback_filter = NULL,
969               callback_on_complete = NULL,
970               callback_on_fail = NULL,
971               callback_transform = NULL
972           WHERE id = $1 AND state = 'running' AND run_lease = $4"#,
973    )
974    .bind(job_id)
975    .bind(callback_id)
976    .bind(timeout_secs)
977    .bind(run_lease)
978    .execute(executor)
979    .await?;
980    if result.rows_affected() == 0 {
981        return Err(AwaError::Validation("job is not in running state".into()));
982    }
983    Ok(callback_id)
984}
985
986/// Complete a waiting job via external callback.
987///
988/// Accepts jobs in `waiting_external` or `running` state (race handling: the
989/// external system may fire before the executor transitions to `waiting_external`).
990///
991/// The `payload` parameter is accepted but not stored on the job — it exists
992/// for callers who want to pass callback data through and will be used by the
993/// planned CEL expression filtering/transforms feature. Callers can process
994/// the payload immediately or enqueue a follow-up job with it.
995pub async fn complete_external<'e, E>(
996    executor: E,
997    callback_id: Uuid,
998    _payload: Option<serde_json::Value>,
999) -> Result<JobRow, AwaError>
1000where
1001    E: PgExecutor<'e>,
1002{
1003    let row = sqlx::query_as::<_, JobRow>(
1004        r#"
1005        UPDATE awa.jobs
1006        SET state = 'completed',
1007            finalized_at = now(),
1008            callback_id = NULL,
1009            callback_timeout_at = NULL,
1010            callback_filter = NULL,
1011            callback_on_complete = NULL,
1012            callback_on_fail = NULL,
1013            callback_transform = NULL,
1014            heartbeat_at = NULL,
1015            deadline_at = NULL,
1016            progress = NULL
1017        WHERE callback_id = $1 AND state IN ('waiting_external', 'running')
1018        RETURNING *
1019        "#,
1020    )
1021    .bind(callback_id)
1022    .fetch_optional(executor)
1023    .await?;
1024
1025    row.ok_or(AwaError::CallbackNotFound {
1026        callback_id: callback_id.to_string(),
1027    })
1028}
1029
1030/// Fail a waiting job via external callback.
1031///
1032/// Records the error and transitions to `failed`.
1033pub async fn fail_external<'e, E>(
1034    executor: E,
1035    callback_id: Uuid,
1036    error: &str,
1037) -> Result<JobRow, AwaError>
1038where
1039    E: PgExecutor<'e>,
1040{
1041    let row = sqlx::query_as::<_, JobRow>(
1042        r#"
1043        UPDATE awa.jobs
1044        SET state = 'failed',
1045            finalized_at = now(),
1046            callback_id = NULL,
1047            callback_timeout_at = NULL,
1048            callback_filter = NULL,
1049            callback_on_complete = NULL,
1050            callback_on_fail = NULL,
1051            callback_transform = NULL,
1052            heartbeat_at = NULL,
1053            deadline_at = NULL,
1054            errors = errors || jsonb_build_object(
1055                'error', $2::text,
1056                'attempt', attempt,
1057                'at', now()
1058            )::jsonb
1059        WHERE callback_id = $1 AND state IN ('waiting_external', 'running')
1060        RETURNING *
1061        "#,
1062    )
1063    .bind(callback_id)
1064    .bind(error)
1065    .fetch_optional(executor)
1066    .await?;
1067
1068    row.ok_or(AwaError::CallbackNotFound {
1069        callback_id: callback_id.to_string(),
1070    })
1071}
1072
1073/// Retry a waiting job via external callback.
1074///
1075/// Resets to `available` with attempt = 0. The handler must be idempotent
1076/// with respect to the external call — a retry re-executes from scratch.
1077///
1078/// Only accepts `waiting_external` state — unlike complete/fail which are
1079/// terminal transitions, retry puts the job back to `available`. Allowing
1080/// retry from `running` would risk concurrent dispatch if the original
1081/// handler hasn't finished yet.
1082pub async fn retry_external<'e, E>(executor: E, callback_id: Uuid) -> Result<JobRow, AwaError>
1083where
1084    E: PgExecutor<'e>,
1085{
1086    let row = sqlx::query_as::<_, JobRow>(
1087        r#"
1088        UPDATE awa.jobs
1089        SET state = 'available',
1090            attempt = 0,
1091            run_at = now(),
1092            finalized_at = NULL,
1093            callback_id = NULL,
1094            callback_timeout_at = NULL,
1095            callback_filter = NULL,
1096            callback_on_complete = NULL,
1097            callback_on_fail = NULL,
1098            callback_transform = NULL,
1099            heartbeat_at = NULL,
1100            deadline_at = NULL
1101        WHERE callback_id = $1 AND state = 'waiting_external'
1102        RETURNING *
1103        "#,
1104    )
1105    .bind(callback_id)
1106    .fetch_optional(executor)
1107    .await?;
1108
1109    row.ok_or(AwaError::CallbackNotFound {
1110        callback_id: callback_id.to_string(),
1111    })
1112}
1113
1114// ── CEL callback expressions ──────────────────────────────────────────
1115
1116/// Configuration for CEL callback expressions.
1117///
1118/// All fields are optional. When all are `None`, behaviour is identical to
1119/// the original `register_callback` (no expression evaluation).
1120#[derive(Debug, Clone, Default)]
1121pub struct CallbackConfig {
1122    /// Gate: should this payload be processed at all? Returns bool.
1123    pub filter: Option<String>,
1124    /// Does this payload indicate success? Returns bool.
1125    pub on_complete: Option<String>,
1126    /// Does this payload indicate failure? Returns bool. Evaluated before on_complete.
1127    pub on_fail: Option<String>,
1128    /// Reshape payload before returning to caller. Returns any Value.
1129    pub transform: Option<String>,
1130}
1131
1132impl CallbackConfig {
1133    /// Returns true if no expressions are configured.
1134    pub fn is_empty(&self) -> bool {
1135        self.filter.is_none()
1136            && self.on_complete.is_none()
1137            && self.on_fail.is_none()
1138            && self.transform.is_none()
1139    }
1140}
1141
1142/// What `resolve_callback` should do if no CEL conditions match or no
1143/// expressions are configured.
1144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1145pub enum DefaultAction {
1146    Complete,
1147    Fail,
1148    Ignore,
1149}
1150
1151/// Outcome of `resolve_callback`.
1152#[derive(Debug)]
1153pub enum ResolveOutcome {
1154    Completed {
1155        payload: Option<serde_json::Value>,
1156        job: JobRow,
1157    },
1158    Failed {
1159        job: JobRow,
1160    },
1161    Ignored {
1162        reason: String,
1163    },
1164}
1165
1166impl ResolveOutcome {
1167    pub fn is_completed(&self) -> bool {
1168        matches!(self, ResolveOutcome::Completed { .. })
1169    }
1170    pub fn is_failed(&self) -> bool {
1171        matches!(self, ResolveOutcome::Failed { .. })
1172    }
1173    pub fn is_ignored(&self) -> bool {
1174        matches!(self, ResolveOutcome::Ignored { .. })
1175    }
1176}
1177
1178/// Register a callback with optional CEL expressions.
1179///
1180/// When expressions are provided and the `cel` feature is enabled, each
1181/// expression is trial-compiled at registration time so syntax errors are
1182/// caught early.
1183///
1184/// When the `cel` feature is disabled and any expression is non-None,
1185/// returns `AwaError::Validation`.
1186pub async fn register_callback_with_config<'e, E>(
1187    executor: E,
1188    job_id: i64,
1189    run_lease: i64,
1190    timeout: std::time::Duration,
1191    config: &CallbackConfig,
1192) -> Result<Uuid, AwaError>
1193where
1194    E: PgExecutor<'e>,
1195{
1196    // Validate CEL expressions at registration time: compile + check references
1197    #[cfg(feature = "cel")]
1198    {
1199        for (name, expr) in [
1200            ("filter", &config.filter),
1201            ("on_complete", &config.on_complete),
1202            ("on_fail", &config.on_fail),
1203            ("transform", &config.transform),
1204        ] {
1205            if let Some(src) = expr {
1206                let program = cel::Program::compile(src).map_err(|e| {
1207                    AwaError::Validation(format!("invalid CEL expression for {name}: {e}"))
1208                })?;
1209
1210                // Reject undeclared variables — CEL only reports these at execution
1211                // time, so an expression like `missing == 1` would parse fine but
1212                // silently fall into the fail-open path at resolve time.
1213                let refs = program.references();
1214                let bad_vars: Vec<&str> = refs
1215                    .variables()
1216                    .into_iter()
1217                    .filter(|v| *v != "payload")
1218                    .collect();
1219                if !bad_vars.is_empty() {
1220                    return Err(AwaError::Validation(format!(
1221                        "CEL expression for {name} references undeclared variable(s): {}; \
1222                         only 'payload' is available",
1223                        bad_vars.join(", ")
1224                    )));
1225                }
1226            }
1227        }
1228    }
1229
1230    #[cfg(not(feature = "cel"))]
1231    {
1232        if !config.is_empty() {
1233            return Err(AwaError::Validation(
1234                "CEL expressions require the 'cel' feature".into(),
1235            ));
1236        }
1237    }
1238
1239    let callback_id = Uuid::new_v4();
1240    let timeout_secs = timeout.as_secs_f64();
1241
1242    let result = sqlx::query(
1243        r#"UPDATE awa.jobs
1244           SET callback_id = $2,
1245               callback_timeout_at = now() + make_interval(secs => $3),
1246               callback_filter = $4,
1247               callback_on_complete = $5,
1248               callback_on_fail = $6,
1249               callback_transform = $7
1250           WHERE id = $1 AND state = 'running' AND run_lease = $8"#,
1251    )
1252    .bind(job_id)
1253    .bind(callback_id)
1254    .bind(timeout_secs)
1255    .bind(&config.filter)
1256    .bind(&config.on_complete)
1257    .bind(&config.on_fail)
1258    .bind(&config.transform)
1259    .bind(run_lease)
1260    .execute(executor)
1261    .await?;
1262
1263    if result.rows_affected() == 0 {
1264        return Err(AwaError::Validation("job is not in running state".into()));
1265    }
1266    Ok(callback_id)
1267}
1268
1269/// Internal action decided by CEL evaluation or default.
1270enum ResolveAction {
1271    Complete(Option<serde_json::Value>),
1272    Fail {
1273        error: String,
1274        expression: Option<String>,
1275    },
1276    Ignore(String),
1277}
1278
1279/// Resolve a callback by evaluating CEL expressions against the payload.
1280///
1281/// Uses a transaction with `SELECT ... FOR UPDATE` for atomicity.
1282/// The `default_action` determines behaviour when no CEL conditions match
1283/// or no expressions are configured.
1284pub async fn resolve_callback(
1285    pool: &PgPool,
1286    callback_id: Uuid,
1287    payload: Option<serde_json::Value>,
1288    default_action: DefaultAction,
1289) -> Result<ResolveOutcome, AwaError> {
1290    let mut tx = pool.begin().await?;
1291
1292    // Query jobs_hot directly (not the awa.jobs UNION ALL view) because
1293    // FOR UPDATE is not reliably supported on UNION views. Waiting_external
1294    // jobs are always in jobs_hot (the check constraint on scheduled_jobs
1295    // only allows scheduled/retryable).
1296    let job = sqlx::query_as::<_, JobRow>(
1297        "SELECT * FROM awa.jobs_hot WHERE callback_id = $1
1298         AND state = 'waiting_external'
1299         FOR UPDATE",
1300    )
1301    .bind(callback_id)
1302    .fetch_optional(&mut *tx)
1303    .await?
1304    .ok_or(AwaError::CallbackNotFound {
1305        callback_id: callback_id.to_string(),
1306    })?;
1307
1308    let action = evaluate_or_default(&job, &payload, default_action)?;
1309
1310    match action {
1311        ResolveAction::Complete(transformed_payload) => {
1312            let completed_job = sqlx::query_as::<_, JobRow>(
1313                r#"
1314                UPDATE awa.jobs
1315                SET state = 'completed',
1316                    finalized_at = now(),
1317                    callback_id = NULL,
1318                    callback_timeout_at = NULL,
1319                    callback_filter = NULL,
1320                    callback_on_complete = NULL,
1321                    callback_on_fail = NULL,
1322                    callback_transform = NULL,
1323                    heartbeat_at = NULL,
1324                    deadline_at = NULL,
1325                    progress = NULL
1326                WHERE id = $1
1327                RETURNING *
1328                "#,
1329            )
1330            .bind(job.id)
1331            .fetch_one(&mut *tx)
1332            .await?;
1333
1334            tx.commit().await?;
1335            Ok(ResolveOutcome::Completed {
1336                payload: transformed_payload,
1337                job: completed_job,
1338            })
1339        }
1340        ResolveAction::Fail { error, expression } => {
1341            let mut error_json = serde_json::json!({
1342                "error": error,
1343                "attempt": job.attempt,
1344                "at": chrono::Utc::now().to_rfc3339(),
1345            });
1346            if let Some(expr) = expression {
1347                error_json["expression"] = serde_json::Value::String(expr);
1348            }
1349
1350            let failed_job = sqlx::query_as::<_, JobRow>(
1351                r#"
1352                UPDATE awa.jobs
1353                SET state = 'failed',
1354                    finalized_at = now(),
1355                    callback_id = NULL,
1356                    callback_timeout_at = NULL,
1357                    callback_filter = NULL,
1358                    callback_on_complete = NULL,
1359                    callback_on_fail = NULL,
1360                    callback_transform = NULL,
1361                    heartbeat_at = NULL,
1362                    deadline_at = NULL,
1363                    errors = errors || $2::jsonb
1364                WHERE id = $1
1365                RETURNING *
1366                "#,
1367            )
1368            .bind(job.id)
1369            .bind(error_json)
1370            .fetch_one(&mut *tx)
1371            .await?;
1372
1373            tx.commit().await?;
1374            Ok(ResolveOutcome::Failed { job: failed_job })
1375        }
1376        ResolveAction::Ignore(reason) => {
1377            // No state change — dropping tx releases FOR UPDATE lock
1378            Ok(ResolveOutcome::Ignored { reason })
1379        }
1380    }
1381}
1382
1383/// Evaluate CEL expressions or fall through to default_action.
1384fn evaluate_or_default(
1385    job: &JobRow,
1386    payload: &Option<serde_json::Value>,
1387    default_action: DefaultAction,
1388) -> Result<ResolveAction, AwaError> {
1389    let has_expressions = job.callback_filter.is_some()
1390        || job.callback_on_complete.is_some()
1391        || job.callback_on_fail.is_some()
1392        || job.callback_transform.is_some();
1393
1394    if !has_expressions {
1395        return Ok(apply_default(default_action, payload));
1396    }
1397
1398    #[cfg(feature = "cel")]
1399    {
1400        Ok(evaluate_cel(job, payload, default_action))
1401    }
1402
1403    #[cfg(not(feature = "cel"))]
1404    {
1405        // Expressions are present but CEL feature is not enabled.
1406        // Return an error without mutating the job — it stays in waiting_external.
1407        let _ = (payload, default_action);
1408        Err(AwaError::Validation(
1409            "CEL expressions present but 'cel' feature is not enabled".into(),
1410        ))
1411    }
1412}
1413
1414fn apply_default(
1415    default_action: DefaultAction,
1416    payload: &Option<serde_json::Value>,
1417) -> ResolveAction {
1418    match default_action {
1419        DefaultAction::Complete => ResolveAction::Complete(payload.clone()),
1420        DefaultAction::Fail => ResolveAction::Fail {
1421            error: "callback failed: default action".to_string(),
1422            expression: None,
1423        },
1424        DefaultAction::Ignore => {
1425            ResolveAction::Ignore("no expressions configured, default is ignore".to_string())
1426        }
1427    }
1428}
1429
1430#[cfg(feature = "cel")]
1431fn evaluate_cel(
1432    job: &JobRow,
1433    payload: &Option<serde_json::Value>,
1434    default_action: DefaultAction,
1435) -> ResolveAction {
1436    let payload_value = payload.as_ref().cloned().unwrap_or(serde_json::Value::Null);
1437
1438    // 1. Evaluate filter
1439    if let Some(filter_expr) = &job.callback_filter {
1440        match eval_bool(filter_expr, &payload_value, job.id, "filter") {
1441            Ok(true) => {} // pass through
1442            Ok(false) => {
1443                return ResolveAction::Ignore("filter expression returned false".to_string());
1444            }
1445            Err(_) => {
1446                // Fail-open: treat filter error as true (pass through)
1447            }
1448        }
1449    }
1450
1451    // 2. Evaluate on_fail (before on_complete — fail takes precedence)
1452    if let Some(on_fail_expr) = &job.callback_on_fail {
1453        match eval_bool(on_fail_expr, &payload_value, job.id, "on_fail") {
1454            Ok(true) => {
1455                return ResolveAction::Fail {
1456                    error: "callback failed: on_fail expression matched".to_string(),
1457                    expression: Some(on_fail_expr.clone()),
1458                };
1459            }
1460            Ok(false) => {} // don't fail
1461            Err(_) => {
1462                // Fail-open: treat on_fail error as false (don't fail)
1463            }
1464        }
1465    }
1466
1467    // 3. Evaluate on_complete
1468    if let Some(on_complete_expr) = &job.callback_on_complete {
1469        match eval_bool(on_complete_expr, &payload_value, job.id, "on_complete") {
1470            Ok(true) => {
1471                // Complete with optional transform
1472                let transformed = apply_transform(job, &payload_value);
1473                return ResolveAction::Complete(Some(transformed));
1474            }
1475            Ok(false) => {} // don't complete
1476            Err(_) => {
1477                // Fail-open: treat on_complete error as false (don't complete)
1478            }
1479        }
1480    }
1481
1482    // 4. Neither condition matched → apply default_action
1483    apply_default(default_action, payload)
1484}
1485
1486#[cfg(feature = "cel")]
1487fn eval_bool(
1488    expression: &str,
1489    payload_value: &serde_json::Value,
1490    job_id: i64,
1491    expression_name: &str,
1492) -> Result<bool, ()> {
1493    let program = match cel::Program::compile(expression) {
1494        Ok(p) => p,
1495        Err(e) => {
1496            tracing::warn!(
1497                job_id,
1498                expression_name,
1499                expression,
1500                error = %e,
1501                "CEL compilation error during evaluation"
1502            );
1503            return Err(());
1504        }
1505    };
1506
1507    let mut context = cel::Context::default();
1508    if let Err(e) = context.add_variable("payload", payload_value.clone()) {
1509        tracing::warn!(
1510            job_id,
1511            expression_name,
1512            error = %e,
1513            "Failed to add payload variable to CEL context"
1514        );
1515        return Err(());
1516    }
1517
1518    match program.execute(&context) {
1519        Ok(cel::Value::Bool(b)) => Ok(b),
1520        Ok(other) => {
1521            tracing::warn!(
1522                job_id,
1523                expression_name,
1524                expression,
1525                result_type = ?other.type_of(),
1526                "CEL expression returned non-bool"
1527            );
1528            Err(())
1529        }
1530        Err(e) => {
1531            tracing::warn!(
1532                job_id,
1533                expression_name,
1534                expression,
1535                error = %e,
1536                "CEL execution error"
1537            );
1538            Err(())
1539        }
1540    }
1541}
1542
1543#[cfg(feature = "cel")]
1544fn apply_transform(job: &JobRow, payload_value: &serde_json::Value) -> serde_json::Value {
1545    let transform_expr = match &job.callback_transform {
1546        Some(expr) => expr,
1547        None => return payload_value.clone(),
1548    };
1549
1550    let program = match cel::Program::compile(transform_expr) {
1551        Ok(p) => p,
1552        Err(e) => {
1553            tracing::warn!(
1554                job_id = job.id,
1555                expression = transform_expr,
1556                error = %e,
1557                "CEL transform compilation error, using original payload"
1558            );
1559            return payload_value.clone();
1560        }
1561    };
1562
1563    let mut context = cel::Context::default();
1564    if let Err(e) = context.add_variable("payload", payload_value.clone()) {
1565        tracing::warn!(
1566            job_id = job.id,
1567            error = %e,
1568            "Failed to add payload variable for transform"
1569        );
1570        return payload_value.clone();
1571    }
1572
1573    match program.execute(&context) {
1574        Ok(value) => match value.json() {
1575            Ok(json) => json,
1576            Err(e) => {
1577                tracing::warn!(
1578                    job_id = job.id,
1579                    expression = transform_expr,
1580                    error = %e,
1581                    "CEL transform result could not be converted to JSON, using original payload"
1582                );
1583                payload_value.clone()
1584            }
1585        },
1586        Err(e) => {
1587            tracing::warn!(
1588                job_id = job.id,
1589                expression = transform_expr,
1590                error = %e,
1591                "CEL transform execution error, using original payload"
1592            );
1593            payload_value.clone()
1594        }
1595    }
1596}