Skip to main content

awa_model/
admin.rs

1use crate::error::AwaError;
2use crate::job::{JobRow, JobState};
3use chrono::{DateTime, Duration, Utc};
4use serde::{Deserialize, Serialize};
5use sqlx::types::Json;
6use sqlx::PgExecutor;
7use sqlx::PgPool;
8use std::cmp::max;
9use std::collections::HashMap;
10use uuid::Uuid;
11
12/// Retry a single failed, cancelled, or waiting_external job.
13pub async fn retry<'e, E>(executor: E, job_id: i64) -> Result<Option<JobRow>, AwaError>
14where
15    E: PgExecutor<'e>,
16{
17    sqlx::query_as::<_, JobRow>(
18        r#"
19        UPDATE awa.jobs
20        SET state = 'available', attempt = 0, run_at = now(),
21            finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL,
22            callback_id = NULL, callback_timeout_at = NULL,
23            callback_filter = NULL, callback_on_complete = NULL,
24            callback_on_fail = NULL, callback_transform = NULL
25        WHERE id = $1 AND state IN ('failed', 'cancelled', 'waiting_external')
26        RETURNING *
27        "#,
28    )
29    .bind(job_id)
30    .fetch_optional(executor)
31    .await?
32    .ok_or(AwaError::JobNotFound { id: job_id })
33    .map(Some)
34}
35
36/// Cancel a single non-terminal job.
37pub async fn cancel<'e, E>(executor: E, job_id: i64) -> Result<Option<JobRow>, AwaError>
38where
39    E: PgExecutor<'e>,
40{
41    sqlx::query_as::<_, JobRow>(
42        r#"
43        UPDATE awa.jobs
44        SET state = 'cancelled', finalized_at = now(),
45            callback_id = NULL, callback_timeout_at = NULL,
46            callback_filter = NULL, callback_on_complete = NULL,
47            callback_on_fail = NULL, callback_transform = NULL
48        WHERE id = $1 AND state NOT IN ('completed', 'failed', 'cancelled')
49        RETURNING *
50        "#,
51    )
52    .bind(job_id)
53    .fetch_optional(executor)
54    .await?
55    .ok_or(AwaError::JobNotFound { id: job_id })
56    .map(Some)
57}
58
59/// Cancel a job by its unique key components.
60///
61/// Reconstructs the BLAKE3 unique key from the same inputs used at insert time
62/// (kind, optional queue, optional args, optional period bucket), then cancels
63/// the single oldest matching non-terminal job. Returns `None` if no matching
64/// job was found (already completed, already cancelled, or never existed).
65///
66/// The parameters must match what was used at insert time: pass `queue` only if
67/// the original `UniqueOpts` had `by_queue: true`, `args` only if `by_args: true`,
68/// and `period_bucket` only if `by_period` was set. Mismatched components produce
69/// a different hash and the job won't be found.
70///
71/// Only one job is cancelled per call (the oldest by `id`). This is intentional:
72/// unique key enforcement uses a state bitmask, so multiple rows with the same
73/// key can legally coexist (e.g., one `waiting_external` + one `available`).
74/// Cancelling all of them in one shot would be surprising.
75///
76/// This is useful when the caller knows the job kind and args but not the job ID —
77/// e.g., cancelling a scheduled reminder when the triggering condition is resolved.
78///
79/// # Implementation notes
80///
81/// Queries `jobs_hot` and `scheduled_jobs` directly rather than the `awa.jobs`
82/// UNION ALL view, because PostgreSQL does not support `FOR UPDATE` on UNION
83/// views. The CTE selects candidate IDs without row locks; blocking on a
84/// concurrently-locked row (e.g., one being processed by a worker) happens
85/// implicitly during the UPDATE phase via the writable view trigger. If the
86/// worker completes the job before the UPDATE acquires the lock, the state
87/// check (`NOT IN ('completed', 'failed', 'cancelled')`) causes the cancel
88/// to no-op and return `None`.
89///
90/// The lookup scans `unique_key` on both physical tables without a dedicated
91/// index. This is acceptable for low-volume use cases. For high-volume tables,
92/// consider adding a partial index on `unique_key WHERE unique_key IS NOT NULL`
93/// or routing through `job_unique_claims` (which is already indexed).
94pub async fn cancel_by_unique_key<'e, E>(
95    executor: E,
96    kind: &str,
97    queue: Option<&str>,
98    args: Option<&serde_json::Value>,
99    period_bucket: Option<i64>,
100) -> Result<Option<JobRow>, AwaError>
101where
102    E: PgExecutor<'e>,
103{
104    let unique_key = crate::unique::compute_unique_key(kind, queue, args, period_bucket);
105
106    // Find the oldest matching job across both physical tables. CTE selects
107    // candidate IDs without row locks; blocking on concurrently-locked rows
108    // happens implicitly during the UPDATE via the writable view trigger.
109    let row = sqlx::query_as::<_, JobRow>(
110        r#"
111        WITH candidates AS (
112            SELECT id FROM awa.jobs_hot
113            WHERE unique_key = $1 AND state NOT IN ('completed', 'failed', 'cancelled')
114            UNION ALL
115            SELECT id FROM awa.scheduled_jobs
116            WHERE unique_key = $1 AND state NOT IN ('completed', 'failed', 'cancelled')
117            ORDER BY id ASC
118            LIMIT 1
119        )
120        UPDATE awa.jobs
121        SET state = 'cancelled', finalized_at = now(),
122            callback_id = NULL, callback_timeout_at = NULL,
123            callback_filter = NULL, callback_on_complete = NULL,
124            callback_on_fail = NULL, callback_transform = NULL
125        FROM candidates
126        WHERE awa.jobs.id = candidates.id
127        RETURNING awa.jobs.*
128        "#,
129    )
130    .bind(&unique_key)
131    .fetch_optional(executor)
132    .await?;
133
134    Ok(row)
135}
136
137/// Retry all failed jobs of a given kind.
138pub async fn retry_failed_by_kind<'e, E>(executor: E, kind: &str) -> Result<Vec<JobRow>, AwaError>
139where
140    E: PgExecutor<'e>,
141{
142    let rows = sqlx::query_as::<_, JobRow>(
143        r#"
144        UPDATE awa.jobs
145        SET state = 'available', attempt = 0, run_at = now(),
146            finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL
147        WHERE kind = $1 AND state = 'failed'
148        RETURNING *
149        "#,
150    )
151    .bind(kind)
152    .fetch_all(executor)
153    .await?;
154
155    Ok(rows)
156}
157
158/// Retry all failed jobs in a given queue.
159pub async fn retry_failed_by_queue<'e, E>(executor: E, queue: &str) -> Result<Vec<JobRow>, AwaError>
160where
161    E: PgExecutor<'e>,
162{
163    let rows = sqlx::query_as::<_, JobRow>(
164        r#"
165        UPDATE awa.jobs
166        SET state = 'available', attempt = 0, run_at = now(),
167            finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL
168        WHERE queue = $1 AND state = 'failed'
169        RETURNING *
170        "#,
171    )
172    .bind(queue)
173    .fetch_all(executor)
174    .await?;
175
176    Ok(rows)
177}
178
179/// Discard (delete) all failed jobs of a given kind.
180pub async fn discard_failed<'e, E>(executor: E, kind: &str) -> Result<u64, AwaError>
181where
182    E: PgExecutor<'e>,
183{
184    let result = sqlx::query("DELETE FROM awa.jobs WHERE kind = $1 AND state = 'failed'")
185        .bind(kind)
186        .execute(executor)
187        .await?;
188
189    Ok(result.rows_affected())
190}
191
192/// Pause a queue. Affects all workers immediately.
193pub async fn pause_queue<'e, E>(
194    executor: E,
195    queue: &str,
196    paused_by: Option<&str>,
197) -> Result<(), AwaError>
198where
199    E: PgExecutor<'e>,
200{
201    sqlx::query(
202        r#"
203        INSERT INTO awa.queue_meta (queue, paused, paused_at, paused_by)
204        VALUES ($1, TRUE, now(), $2)
205        ON CONFLICT (queue) DO UPDATE SET paused = TRUE, paused_at = now(), paused_by = $2
206        "#,
207    )
208    .bind(queue)
209    .bind(paused_by)
210    .execute(executor)
211    .await?;
212
213    Ok(())
214}
215
216/// Resume a paused queue.
217pub async fn resume_queue<'e, E>(executor: E, queue: &str) -> Result<(), AwaError>
218where
219    E: PgExecutor<'e>,
220{
221    sqlx::query("UPDATE awa.queue_meta SET paused = FALSE WHERE queue = $1")
222        .bind(queue)
223        .execute(executor)
224        .await?;
225
226    Ok(())
227}
228
229/// Drain a queue: cancel all non-running, non-terminal jobs.
230pub async fn drain_queue<'e, E>(executor: E, queue: &str) -> Result<u64, AwaError>
231where
232    E: PgExecutor<'e>,
233{
234    let result = sqlx::query(
235        r#"
236        UPDATE awa.jobs
237        SET state = 'cancelled', finalized_at = now(),
238            callback_id = NULL, callback_timeout_at = NULL,
239            callback_filter = NULL, callback_on_complete = NULL,
240            callback_on_fail = NULL, callback_transform = NULL
241        WHERE queue = $1 AND state IN ('available', 'scheduled', 'retryable', 'waiting_external')
242        "#,
243    )
244    .bind(queue)
245    .execute(executor)
246    .await?;
247
248    Ok(result.rows_affected())
249}
250
251/// Queue statistics.
252#[derive(Debug, Clone, Serialize)]
253pub struct QueueStats {
254    pub queue: String,
255    /// All non-terminal jobs for the queue, including running and waiting_external.
256    pub total_queued: i64,
257    pub scheduled: i64,
258    pub available: i64,
259    pub retryable: i64,
260    pub running: i64,
261    pub failed: i64,
262    pub waiting_external: i64,
263    pub completed_last_hour: i64,
264    pub lag_seconds: Option<f64>,
265    pub paused: bool,
266}
267
268/// Snapshot of a per-queue rate limit configuration.
269#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
270pub struct RateLimitSnapshot {
271    pub max_rate: f64,
272    pub burst: u32,
273}
274
275/// Runtime concurrency mode for a queue.
276#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
277#[serde(rename_all = "snake_case")]
278pub enum QueueRuntimeMode {
279    HardReserved,
280    Weighted,
281}
282
283/// Per-queue configuration published by a worker runtime instance.
284#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
285pub struct QueueRuntimeConfigSnapshot {
286    pub mode: QueueRuntimeMode,
287    pub max_workers: Option<u32>,
288    pub min_workers: Option<u32>,
289    pub weight: Option<u32>,
290    pub global_max_workers: Option<u32>,
291    pub poll_interval_ms: u64,
292    pub deadline_duration_secs: u64,
293    pub priority_aging_interval_secs: u64,
294    pub rate_limit: Option<RateLimitSnapshot>,
295}
296
297/// Runtime state for one queue on one worker instance.
298#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
299pub struct QueueRuntimeSnapshot {
300    pub queue: String,
301    pub in_flight: u32,
302    pub overflow_held: Option<u32>,
303    pub config: QueueRuntimeConfigSnapshot,
304}
305
306/// Data written by a worker runtime into the observability snapshot table.
307#[derive(Debug, Clone, Serialize, Deserialize)]
308pub struct RuntimeSnapshotInput {
309    pub instance_id: Uuid,
310    pub hostname: Option<String>,
311    pub pid: i32,
312    pub version: String,
313    pub started_at: DateTime<Utc>,
314    pub snapshot_interval_ms: i64,
315    pub healthy: bool,
316    pub postgres_connected: bool,
317    pub poll_loop_alive: bool,
318    pub heartbeat_alive: bool,
319    pub maintenance_alive: bool,
320    pub shutting_down: bool,
321    pub leader: bool,
322    pub global_max_workers: Option<u32>,
323    pub queues: Vec<QueueRuntimeSnapshot>,
324}
325
326/// A worker runtime instance as exposed through the admin API.
327#[derive(Debug, Clone, Serialize, Deserialize)]
328pub struct RuntimeInstance {
329    pub instance_id: Uuid,
330    pub hostname: Option<String>,
331    pub pid: i32,
332    pub version: String,
333    pub started_at: DateTime<Utc>,
334    pub last_seen_at: DateTime<Utc>,
335    pub snapshot_interval_ms: i64,
336    pub stale: bool,
337    pub healthy: bool,
338    pub postgres_connected: bool,
339    pub poll_loop_alive: bool,
340    pub heartbeat_alive: bool,
341    pub maintenance_alive: bool,
342    pub shutting_down: bool,
343    pub leader: bool,
344    pub global_max_workers: Option<u32>,
345    pub queues: Vec<QueueRuntimeSnapshot>,
346}
347
348impl RuntimeInstance {
349    fn stale_cutoff(interval_ms: i64) -> Duration {
350        let interval_ms = max(interval_ms, 1_000);
351        Duration::milliseconds(max(interval_ms.saturating_mul(3), 30_000))
352    }
353
354    fn from_db_row(row: RuntimeInstanceRow, now: DateTime<Utc>) -> Self {
355        let stale = row.last_seen_at + Self::stale_cutoff(row.snapshot_interval_ms) < now;
356        Self {
357            instance_id: row.instance_id,
358            hostname: row.hostname,
359            pid: row.pid,
360            version: row.version,
361            started_at: row.started_at,
362            last_seen_at: row.last_seen_at,
363            snapshot_interval_ms: row.snapshot_interval_ms,
364            stale,
365            healthy: row.healthy,
366            postgres_connected: row.postgres_connected,
367            poll_loop_alive: row.poll_loop_alive,
368            heartbeat_alive: row.heartbeat_alive,
369            maintenance_alive: row.maintenance_alive,
370            shutting_down: row.shutting_down,
371            leader: row.leader,
372            global_max_workers: row.global_max_workers.map(|v| v as u32),
373            queues: row.queues.0,
374        }
375    }
376}
377
378/// Cluster-wide runtime overview.
379#[derive(Debug, Clone, Serialize, Deserialize)]
380pub struct RuntimeOverview {
381    pub total_instances: usize,
382    pub live_instances: usize,
383    pub stale_instances: usize,
384    pub healthy_instances: usize,
385    pub leader_instances: usize,
386    pub instances: Vec<RuntimeInstance>,
387}
388
389/// Queue-centric runtime/config summary aggregated across worker instances.
390#[derive(Debug, Clone, Serialize, Deserialize)]
391pub struct QueueRuntimeSummary {
392    pub queue: String,
393    pub instance_count: usize,
394    pub live_instances: usize,
395    pub stale_instances: usize,
396    pub healthy_instances: usize,
397    pub total_in_flight: u64,
398    pub overflow_held_total: Option<u64>,
399    pub config_mismatch: bool,
400    pub config: Option<QueueRuntimeConfigSnapshot>,
401}
402
403#[derive(Debug, sqlx::FromRow)]
404struct RuntimeInstanceRow {
405    instance_id: Uuid,
406    hostname: Option<String>,
407    pid: i32,
408    version: String,
409    started_at: DateTime<Utc>,
410    last_seen_at: DateTime<Utc>,
411    snapshot_interval_ms: i64,
412    healthy: bool,
413    postgres_connected: bool,
414    poll_loop_alive: bool,
415    heartbeat_alive: bool,
416    maintenance_alive: bool,
417    shutting_down: bool,
418    leader: bool,
419    global_max_workers: Option<i32>,
420    queues: Json<Vec<QueueRuntimeSnapshot>>,
421}
422
423/// Upsert a runtime observability snapshot for one worker instance.
424pub async fn upsert_runtime_snapshot<'e, E>(
425    executor: E,
426    snapshot: &RuntimeSnapshotInput,
427) -> Result<(), AwaError>
428where
429    E: PgExecutor<'e>,
430{
431    sqlx::query(
432        r#"
433        INSERT INTO awa.runtime_instances (
434            instance_id,
435            hostname,
436            pid,
437            version,
438            started_at,
439            last_seen_at,
440            snapshot_interval_ms,
441            healthy,
442            postgres_connected,
443            poll_loop_alive,
444            heartbeat_alive,
445            maintenance_alive,
446            shutting_down,
447            leader,
448            global_max_workers,
449            queues
450        )
451        VALUES (
452            $1, $2, $3, $4, $5, now(), $6, $7, $8, $9, $10, $11, $12, $13, $14, $15
453        )
454        ON CONFLICT (instance_id) DO UPDATE SET
455            hostname = EXCLUDED.hostname,
456            pid = EXCLUDED.pid,
457            version = EXCLUDED.version,
458            started_at = EXCLUDED.started_at,
459            last_seen_at = now(),
460            snapshot_interval_ms = EXCLUDED.snapshot_interval_ms,
461            healthy = EXCLUDED.healthy,
462            postgres_connected = EXCLUDED.postgres_connected,
463            poll_loop_alive = EXCLUDED.poll_loop_alive,
464            heartbeat_alive = EXCLUDED.heartbeat_alive,
465            maintenance_alive = EXCLUDED.maintenance_alive,
466            shutting_down = EXCLUDED.shutting_down,
467            leader = EXCLUDED.leader,
468            global_max_workers = EXCLUDED.global_max_workers,
469            queues = EXCLUDED.queues
470        "#,
471    )
472    .bind(snapshot.instance_id)
473    .bind(snapshot.hostname.as_deref())
474    .bind(snapshot.pid)
475    .bind(&snapshot.version)
476    .bind(snapshot.started_at)
477    .bind(snapshot.snapshot_interval_ms)
478    .bind(snapshot.healthy)
479    .bind(snapshot.postgres_connected)
480    .bind(snapshot.poll_loop_alive)
481    .bind(snapshot.heartbeat_alive)
482    .bind(snapshot.maintenance_alive)
483    .bind(snapshot.shutting_down)
484    .bind(snapshot.leader)
485    .bind(snapshot.global_max_workers.map(|v| v as i32))
486    .bind(Json(&snapshot.queues))
487    .execute(executor)
488    .await?;
489
490    Ok(())
491}
492
493/// Opportunistically delete long-stale runtime snapshot rows.
494pub async fn cleanup_runtime_snapshots<'e, E>(
495    executor: E,
496    max_age: Duration,
497) -> Result<u64, AwaError>
498where
499    E: PgExecutor<'e>,
500{
501    let seconds = max(max_age.num_seconds(), 1);
502    let result = sqlx::query(
503        "DELETE FROM awa.runtime_instances WHERE last_seen_at < now() - make_interval(secs => $1)",
504    )
505    .bind(seconds)
506    .execute(executor)
507    .await?;
508
509    Ok(result.rows_affected())
510}
511
512/// List all runtime instances ordered with leader/live instances first.
513pub async fn list_runtime_instances<'e, E>(executor: E) -> Result<Vec<RuntimeInstance>, AwaError>
514where
515    E: PgExecutor<'e>,
516{
517    let rows = sqlx::query_as::<_, RuntimeInstanceRow>(
518        r#"
519        SELECT
520            instance_id,
521            hostname,
522            pid,
523            version,
524            started_at,
525            last_seen_at,
526            snapshot_interval_ms,
527            healthy,
528            postgres_connected,
529            poll_loop_alive,
530            heartbeat_alive,
531            maintenance_alive,
532            shutting_down,
533            leader,
534            global_max_workers,
535            queues
536        FROM awa.runtime_instances
537        ORDER BY leader DESC, last_seen_at DESC, started_at DESC
538        "#,
539    )
540    .fetch_all(executor)
541    .await?;
542
543    let now = Utc::now();
544    Ok(rows
545        .into_iter()
546        .map(|row| RuntimeInstance::from_db_row(row, now))
547        .collect())
548}
549
550/// Cluster runtime overview with instance list.
551pub async fn runtime_overview<'e, E>(executor: E) -> Result<RuntimeOverview, AwaError>
552where
553    E: PgExecutor<'e>,
554{
555    let instances = list_runtime_instances(executor).await?;
556    let total_instances = instances.len();
557    let stale_instances = instances.iter().filter(|i| i.stale).count();
558    let live_instances = total_instances.saturating_sub(stale_instances);
559    let healthy_instances = instances.iter().filter(|i| !i.stale && i.healthy).count();
560    let leader_instances = instances.iter().filter(|i| !i.stale && i.leader).count();
561
562    Ok(RuntimeOverview {
563        total_instances,
564        live_instances,
565        stale_instances,
566        healthy_instances,
567        leader_instances,
568        instances,
569    })
570}
571
572/// Queue runtime/config summary aggregated across worker snapshots.
573pub async fn queue_runtime_summary<'e, E>(executor: E) -> Result<Vec<QueueRuntimeSummary>, AwaError>
574where
575    E: PgExecutor<'e>,
576{
577    let instances = list_runtime_instances(executor).await?;
578    let mut by_queue: HashMap<String, Vec<(bool, bool, QueueRuntimeSnapshot)>> = HashMap::new();
579
580    for instance in instances {
581        let is_live = !instance.stale;
582        let is_healthy = is_live && instance.healthy;
583        for queue in instance.queues {
584            by_queue
585                .entry(queue.queue.clone())
586                .or_default()
587                .push((is_live, is_healthy, queue));
588        }
589    }
590
591    let mut summaries: Vec<_> = by_queue
592        .into_iter()
593        .map(|(queue, entries)| {
594            let instance_count = entries.len();
595            let live_instances = entries.iter().filter(|(live, _, _)| *live).count();
596            let stale_instances = instance_count.saturating_sub(live_instances);
597            let healthy_instances = entries.iter().filter(|(_, healthy, _)| *healthy).count();
598            let total_in_flight = entries
599                .iter()
600                .filter(|(live, _, _)| *live)
601                .map(|(_, _, queue)| u64::from(queue.in_flight))
602                .sum();
603
604            let overflow_total: u64 = entries
605                .iter()
606                .filter(|(live, _, _)| *live)
607                .filter_map(|(_, _, queue)| queue.overflow_held.map(u64::from))
608                .sum();
609
610            let live_configs: Vec<_> = entries
611                .iter()
612                .filter(|(live, _, _)| *live)
613                .map(|(_, _, queue)| queue.config.clone())
614                .collect();
615            let config_candidates = if live_configs.is_empty() {
616                entries
617                    .iter()
618                    .map(|(_, _, queue)| queue.config.clone())
619                    .collect::<Vec<_>>()
620            } else {
621                live_configs
622            };
623            let config = config_candidates.first().cloned();
624            let config_mismatch = config_candidates
625                .iter()
626                .skip(1)
627                .any(|candidate| Some(candidate) != config.as_ref());
628
629            QueueRuntimeSummary {
630                queue,
631                instance_count,
632                live_instances,
633                stale_instances,
634                healthy_instances,
635                total_in_flight,
636                overflow_held_total: config
637                    .as_ref()
638                    .filter(|cfg| cfg.mode == QueueRuntimeMode::Weighted)
639                    .map(|_| overflow_total),
640                config_mismatch,
641                config,
642            }
643        })
644        .collect();
645
646    summaries.sort_by(|a, b| a.queue.cmp(&b.queue));
647    Ok(summaries)
648}
649
650/// Get statistics for all queues.
651pub async fn queue_stats<'e, E>(executor: E) -> Result<Vec<QueueStats>, AwaError>
652where
653    E: PgExecutor<'e>,
654{
655    let rows = sqlx::query_as::<
656        _,
657        (
658            String,
659            i64,
660            i64,
661            i64,
662            i64,
663            i64,
664            i64,
665            i64,
666            i64,
667            Option<f64>,
668            bool,
669        ),
670    >(
671        r#"
672        WITH available_lag AS (
673            SELECT
674                queue,
675                EXTRACT(EPOCH FROM (now() - min(run_at)))::float8 AS lag_seconds
676            FROM awa.jobs_hot
677            WHERE state = 'available'
678            GROUP BY queue
679        ),
680        completed_recent AS (
681            SELECT
682                queue,
683                count(*)::bigint AS completed_last_hour
684            FROM awa.jobs_hot
685            WHERE state = 'completed'
686              AND finalized_at > now() - interval '1 hour'
687            GROUP BY queue
688        )
689        SELECT
690            qs.queue,
691            qs.scheduled + qs.available + qs.running + qs.retryable + qs.waiting_external AS total_queued,
692            qs.scheduled,
693            qs.available,
694            qs.retryable,
695            qs.running,
696            qs.failed,
697            qs.waiting_external,
698            COALESCE(cr.completed_last_hour, 0) AS completed_last_hour,
699            al.lag_seconds,
700            COALESCE(qm.paused, FALSE) AS paused
701        FROM awa.queue_state_counts qs
702        LEFT JOIN available_lag al ON al.queue = qs.queue
703        LEFT JOIN completed_recent cr ON cr.queue = qs.queue
704        LEFT JOIN awa.queue_meta qm ON qm.queue = qs.queue
705        ORDER BY qs.queue
706        "#,
707    )
708    .fetch_all(executor)
709    .await?;
710
711    Ok(rows
712        .into_iter()
713        .map(
714            |(
715                queue,
716                total_queued,
717                scheduled,
718                available,
719                retryable,
720                running,
721                failed,
722                waiting_external,
723                completed_last_hour,
724                lag_seconds,
725                paused,
726            )| QueueStats {
727                queue,
728                total_queued,
729                scheduled,
730                available,
731                retryable,
732                running,
733                failed,
734                waiting_external,
735                completed_last_hour,
736                lag_seconds,
737                paused,
738            },
739        )
740        .collect())
741}
742
743/// List jobs with optional filters.
744#[derive(Debug, Clone, Default, Serialize)]
745pub struct ListJobsFilter {
746    pub state: Option<JobState>,
747    pub kind: Option<String>,
748    pub queue: Option<String>,
749    pub tag: Option<String>,
750    pub before_id: Option<i64>,
751    pub limit: Option<i64>,
752}
753
754/// List jobs matching the given filter.
755pub async fn list_jobs<'e, E>(executor: E, filter: &ListJobsFilter) -> Result<Vec<JobRow>, AwaError>
756where
757    E: PgExecutor<'e>,
758{
759    let limit = filter.limit.unwrap_or(100);
760
761    let rows = sqlx::query_as::<_, JobRow>(
762        r#"
763        SELECT * FROM awa.jobs
764        WHERE ($1::awa.job_state IS NULL OR state = $1)
765          AND ($2::text IS NULL OR kind = $2)
766          AND ($3::text IS NULL OR queue = $3)
767          AND ($4::text IS NULL OR tags @> ARRAY[$4]::text[])
768          AND ($5::bigint IS NULL OR id < $5)
769        ORDER BY id DESC
770        LIMIT $6
771        "#,
772    )
773    .bind(filter.state)
774    .bind(&filter.kind)
775    .bind(&filter.queue)
776    .bind(&filter.tag)
777    .bind(filter.before_id)
778    .bind(limit)
779    .fetch_all(executor)
780    .await?;
781
782    Ok(rows)
783}
784
785/// Get a single job by ID.
786pub async fn get_job<'e, E>(executor: E, job_id: i64) -> Result<JobRow, AwaError>
787where
788    E: PgExecutor<'e>,
789{
790    let row = sqlx::query_as::<_, JobRow>("SELECT * FROM awa.jobs WHERE id = $1")
791        .bind(job_id)
792        .fetch_optional(executor)
793        .await?;
794
795    row.ok_or(AwaError::JobNotFound { id: job_id })
796}
797
798/// Count jobs grouped by state.
799pub async fn state_counts<'e, E>(executor: E) -> Result<HashMap<JobState, i64>, AwaError>
800where
801    E: PgExecutor<'e>,
802{
803    // Single scan of queue_state_counts — sums all columns in one pass
804    // then unpivots via VALUES join.
805    let rows = sqlx::query_as::<_, (JobState, i64)>(
806        r#"
807        SELECT v.state, v.total FROM (
808            SELECT
809                COALESCE(sum(scheduled), 0)::bigint      AS scheduled,
810                COALESCE(sum(available), 0)::bigint      AS available,
811                COALESCE(sum(running), 0)::bigint        AS running,
812                COALESCE(sum(completed), 0)::bigint      AS completed,
813                COALESCE(sum(retryable), 0)::bigint      AS retryable,
814                COALESCE(sum(failed), 0)::bigint         AS failed,
815                COALESCE(sum(cancelled), 0)::bigint      AS cancelled,
816                COALESCE(sum(waiting_external), 0)::bigint AS waiting_external
817            FROM awa.queue_state_counts
818        ) s,
819        LATERAL (VALUES
820            ('scheduled'::awa.job_state,        s.scheduled),
821            ('available'::awa.job_state,        s.available),
822            ('running'::awa.job_state,          s.running),
823            ('completed'::awa.job_state,        s.completed),
824            ('retryable'::awa.job_state,        s.retryable),
825            ('failed'::awa.job_state,           s.failed),
826            ('cancelled'::awa.job_state,        s.cancelled),
827            ('waiting_external'::awa.job_state, s.waiting_external)
828        ) AS v(state, total)
829        "#,
830    )
831    .fetch_all(executor)
832    .await?;
833
834    Ok(rows.into_iter().collect())
835}
836
837/// Return all distinct job kinds.
838pub async fn distinct_kinds<'e, E>(executor: E) -> Result<Vec<String>, AwaError>
839where
840    E: PgExecutor<'e>,
841{
842    let rows = sqlx::query_scalar::<_, String>(
843        "SELECT kind FROM awa.job_kind_catalog WHERE ref_count > 0 ORDER BY kind",
844    )
845    .fetch_all(executor)
846    .await?;
847
848    Ok(rows)
849}
850
851/// Return all distinct queue names.
852pub async fn distinct_queues<'e, E>(executor: E) -> Result<Vec<String>, AwaError>
853where
854    E: PgExecutor<'e>,
855{
856    let rows = sqlx::query_scalar::<_, String>(
857        "SELECT queue FROM awa.job_queue_catalog WHERE ref_count > 0 ORDER BY queue",
858    )
859    .fetch_all(executor)
860    .await?;
861
862    Ok(rows)
863}
864
865/// Retry multiple jobs by ID. Only retries failed, cancelled, or waiting_external jobs.
866pub async fn bulk_retry<'e, E>(executor: E, ids: &[i64]) -> Result<Vec<JobRow>, AwaError>
867where
868    E: PgExecutor<'e>,
869{
870    let rows = sqlx::query_as::<_, JobRow>(
871        r#"
872        UPDATE awa.jobs
873        SET state = 'available', attempt = 0, run_at = now(),
874            finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL,
875            callback_id = NULL, callback_timeout_at = NULL,
876            callback_filter = NULL, callback_on_complete = NULL,
877            callback_on_fail = NULL, callback_transform = NULL
878        WHERE id = ANY($1) AND state IN ('failed', 'cancelled', 'waiting_external')
879        RETURNING *
880        "#,
881    )
882    .bind(ids)
883    .fetch_all(executor)
884    .await?;
885
886    Ok(rows)
887}
888
889/// Cancel multiple jobs by ID. Only cancels non-terminal jobs.
890pub async fn bulk_cancel<'e, E>(executor: E, ids: &[i64]) -> Result<Vec<JobRow>, AwaError>
891where
892    E: PgExecutor<'e>,
893{
894    let rows = sqlx::query_as::<_, JobRow>(
895        r#"
896        UPDATE awa.jobs
897        SET state = 'cancelled', finalized_at = now(),
898            callback_id = NULL, callback_timeout_at = NULL,
899            callback_filter = NULL, callback_on_complete = NULL,
900            callback_on_fail = NULL, callback_transform = NULL
901        WHERE id = ANY($1) AND state NOT IN ('completed', 'failed', 'cancelled')
902        RETURNING *
903        "#,
904    )
905    .bind(ids)
906    .fetch_all(executor)
907    .await?;
908
909    Ok(rows)
910}
911
912/// A bucketed count of jobs by state over time.
913#[derive(Debug, Clone, Serialize)]
914pub struct StateTimeseriesBucket {
915    pub bucket: chrono::DateTime<chrono::Utc>,
916    pub state: JobState,
917    pub count: i64,
918}
919
920/// Return time-bucketed state counts over the last N minutes.
921pub async fn state_timeseries<'e, E>(
922    executor: E,
923    minutes: i32,
924) -> Result<Vec<StateTimeseriesBucket>, AwaError>
925where
926    E: PgExecutor<'e>,
927{
928    let rows = sqlx::query_as::<_, (chrono::DateTime<chrono::Utc>, JobState, i64)>(
929        r#"
930        SELECT
931            date_trunc('minute', created_at) AS bucket,
932            state,
933            count(*) AS count
934        FROM awa.jobs
935        WHERE created_at >= now() - make_interval(mins => $1)
936        GROUP BY bucket, state
937        ORDER BY bucket
938        "#,
939    )
940    .bind(minutes)
941    .fetch_all(executor)
942    .await?;
943
944    Ok(rows
945        .into_iter()
946        .map(|(bucket, state, count)| StateTimeseriesBucket {
947            bucket,
948            state,
949            count,
950        })
951        .collect())
952}
953
954/// Register a callback for a running job, writing the callback_id and timeout
955/// to the database immediately.
956///
957/// Call this BEFORE sending the callback_id to the external system to avoid
958/// the race condition where the external system fires before the DB knows
959/// about the callback.
960///
961/// Returns the generated callback UUID on success.
962pub async fn register_callback<'e, E>(
963    executor: E,
964    job_id: i64,
965    run_lease: i64,
966    timeout: std::time::Duration,
967) -> Result<Uuid, AwaError>
968where
969    E: PgExecutor<'e>,
970{
971    let callback_id = Uuid::new_v4();
972    let timeout_secs = timeout.as_secs_f64();
973    let result = sqlx::query(
974        r#"UPDATE awa.jobs
975           SET callback_id = $2,
976               callback_timeout_at = now() + make_interval(secs => $3),
977               callback_filter = NULL,
978               callback_on_complete = NULL,
979               callback_on_fail = NULL,
980               callback_transform = NULL
981           WHERE id = $1 AND state = 'running' AND run_lease = $4"#,
982    )
983    .bind(job_id)
984    .bind(callback_id)
985    .bind(timeout_secs)
986    .bind(run_lease)
987    .execute(executor)
988    .await?;
989    if result.rows_affected() == 0 {
990        return Err(AwaError::Validation("job is not in running state".into()));
991    }
992    Ok(callback_id)
993}
994
995/// Complete a waiting job via external callback.
996///
997/// Accepts jobs in `waiting_external` or `running` state (race handling: the
998/// external system may fire before the executor transitions to `waiting_external`).
999///
1000/// When `resume` is `false` (default), the job transitions to `completed`.
1001/// When `resume` is `true`, the job transitions back to `running` with the
1002/// callback payload stored in metadata under `_awa_callback_result`. The
1003/// handler can then read the result and continue processing (sequential
1004/// callback pattern from ADR-016).
1005pub async fn complete_external<'e, E>(
1006    executor: E,
1007    callback_id: Uuid,
1008    payload: Option<serde_json::Value>,
1009    run_lease: Option<i64>,
1010) -> Result<JobRow, AwaError>
1011where
1012    E: PgExecutor<'e>,
1013{
1014    complete_external_inner(executor, callback_id, payload, run_lease, false).await
1015}
1016
1017/// Complete a waiting job and resume the handler with the callback payload.
1018///
1019/// Like `complete_external`, but the job transitions to `running` instead of
1020/// `completed`, allowing the handler to continue with sequential callbacks.
1021/// The payload is stored in `metadata._awa_callback_result`.
1022pub async fn resume_external<'e, E>(
1023    executor: E,
1024    callback_id: Uuid,
1025    payload: Option<serde_json::Value>,
1026    run_lease: Option<i64>,
1027) -> Result<JobRow, AwaError>
1028where
1029    E: PgExecutor<'e>,
1030{
1031    complete_external_inner(executor, callback_id, payload, run_lease, true).await
1032}
1033
1034async fn complete_external_inner<'e, E>(
1035    executor: E,
1036    callback_id: Uuid,
1037    payload: Option<serde_json::Value>,
1038    run_lease: Option<i64>,
1039    resume: bool,
1040) -> Result<JobRow, AwaError>
1041where
1042    E: PgExecutor<'e>,
1043{
1044    let row = if resume {
1045        // Resume: transition to running, store payload, refresh heartbeat.
1046        // The handler is still alive and polling — it will detect the state change.
1047        let payload_json = payload.unwrap_or(serde_json::Value::Null);
1048        sqlx::query_as::<_, JobRow>(
1049            r#"
1050            UPDATE awa.jobs
1051            SET state = 'running',
1052                callback_id = NULL,
1053                callback_timeout_at = NULL,
1054                callback_filter = NULL,
1055                callback_on_complete = NULL,
1056                callback_on_fail = NULL,
1057                callback_transform = NULL,
1058                heartbeat_at = now(),
1059                metadata = metadata || jsonb_build_object('_awa_callback_result', $3::jsonb)
1060            WHERE callback_id = $1 AND state IN ('waiting_external', 'running')
1061              AND ($2::bigint IS NULL OR run_lease = $2)
1062            RETURNING *
1063            "#,
1064        )
1065        .bind(callback_id)
1066        .bind(run_lease)
1067        .bind(&payload_json)
1068        .fetch_optional(executor)
1069        .await?
1070    } else {
1071        // Complete: terminal state, clear everything.
1072        sqlx::query_as::<_, JobRow>(
1073            r#"
1074            UPDATE awa.jobs
1075            SET state = 'completed',
1076                finalized_at = now(),
1077                callback_id = NULL,
1078                callback_timeout_at = NULL,
1079                callback_filter = NULL,
1080                callback_on_complete = NULL,
1081                callback_on_fail = NULL,
1082                callback_transform = NULL,
1083                heartbeat_at = NULL,
1084                deadline_at = NULL,
1085                progress = NULL
1086            WHERE callback_id = $1 AND state IN ('waiting_external', 'running')
1087              AND ($2::bigint IS NULL OR run_lease = $2)
1088            RETURNING *
1089            "#,
1090        )
1091        .bind(callback_id)
1092        .bind(run_lease)
1093        .fetch_optional(executor)
1094        .await?
1095    };
1096
1097    row.ok_or(AwaError::CallbackNotFound {
1098        callback_id: callback_id.to_string(),
1099    })
1100}
1101
1102/// Fail a waiting job via external callback.
1103///
1104/// Records the error and transitions to `failed`.
1105pub async fn fail_external<'e, E>(
1106    executor: E,
1107    callback_id: Uuid,
1108    error: &str,
1109    run_lease: Option<i64>,
1110) -> Result<JobRow, AwaError>
1111where
1112    E: PgExecutor<'e>,
1113{
1114    let row = sqlx::query_as::<_, JobRow>(
1115        r#"
1116        UPDATE awa.jobs
1117        SET state = 'failed',
1118            finalized_at = now(),
1119            callback_id = NULL,
1120            callback_timeout_at = NULL,
1121            callback_filter = NULL,
1122            callback_on_complete = NULL,
1123            callback_on_fail = NULL,
1124            callback_transform = NULL,
1125            heartbeat_at = NULL,
1126            deadline_at = NULL,
1127            errors = errors || jsonb_build_object(
1128                'error', $2::text,
1129                'attempt', attempt,
1130                'at', now()
1131            )::jsonb
1132        WHERE callback_id = $1 AND state IN ('waiting_external', 'running')
1133          AND ($3::bigint IS NULL OR run_lease = $3)
1134        RETURNING *
1135        "#,
1136    )
1137    .bind(callback_id)
1138    .bind(error)
1139    .bind(run_lease)
1140    .fetch_optional(executor)
1141    .await?;
1142
1143    row.ok_or(AwaError::CallbackNotFound {
1144        callback_id: callback_id.to_string(),
1145    })
1146}
1147
1148/// Retry a waiting job via external callback.
1149///
1150/// Resets to `available` with attempt = 0. The handler must be idempotent
1151/// with respect to the external call — a retry re-executes from scratch.
1152///
1153/// Only accepts `waiting_external` state — unlike complete/fail which are
1154/// terminal transitions, retry puts the job back to `available`. Allowing
1155/// retry from `running` would risk concurrent dispatch if the original
1156/// handler hasn't finished yet.
1157pub async fn retry_external<'e, E>(
1158    executor: E,
1159    callback_id: Uuid,
1160    run_lease: Option<i64>,
1161) -> Result<JobRow, AwaError>
1162where
1163    E: PgExecutor<'e>,
1164{
1165    let row = sqlx::query_as::<_, JobRow>(
1166        r#"
1167        UPDATE awa.jobs
1168        SET state = 'available',
1169            attempt = 0,
1170            run_at = now(),
1171            finalized_at = NULL,
1172            callback_id = NULL,
1173            callback_timeout_at = NULL,
1174            callback_filter = NULL,
1175            callback_on_complete = NULL,
1176            callback_on_fail = NULL,
1177            callback_transform = NULL,
1178            heartbeat_at = NULL,
1179            deadline_at = NULL
1180        WHERE callback_id = $1 AND state = 'waiting_external'
1181          AND ($2::bigint IS NULL OR run_lease = $2)
1182        RETURNING *
1183        "#,
1184    )
1185    .bind(callback_id)
1186    .bind(run_lease)
1187    .fetch_optional(executor)
1188    .await?;
1189
1190    row.ok_or(AwaError::CallbackNotFound {
1191        callback_id: callback_id.to_string(),
1192    })
1193}
1194
1195/// Reset the callback timeout for a long-running external operation.
1196///
1197/// External systems call this periodically to signal "still working" without
1198/// completing the job. Resets `callback_timeout_at` to `now() + timeout`.
1199/// The job stays in `waiting_external`.
1200///
1201/// Returns the updated job row, or `CallbackNotFound` if the callback ID
1202/// doesn't match a waiting job.
1203pub async fn heartbeat_callback<'e, E>(
1204    executor: E,
1205    callback_id: Uuid,
1206    timeout: std::time::Duration,
1207) -> Result<JobRow, AwaError>
1208where
1209    E: PgExecutor<'e>,
1210{
1211    let timeout_secs = timeout.as_secs_f64();
1212    let row = sqlx::query_as::<_, JobRow>(
1213        r#"
1214        UPDATE awa.jobs
1215        SET callback_timeout_at = now() + make_interval(secs => $2)
1216        WHERE callback_id = $1 AND state = 'waiting_external'
1217        RETURNING *
1218        "#,
1219    )
1220    .bind(callback_id)
1221    .bind(timeout_secs)
1222    .fetch_optional(executor)
1223    .await?;
1224
1225    row.ok_or(AwaError::CallbackNotFound {
1226        callback_id: callback_id.to_string(),
1227    })
1228}
1229
1230/// Cancel (clear) a registered callback for a running job.
1231///
1232/// Best-effort cleanup: returns `Ok(true)` if a row was updated,
1233/// `Ok(false)` if no match (already resolved, rescued, or wrong lease).
1234/// Callers should not treat `false` as an error.
1235pub async fn cancel_callback<'e, E>(
1236    executor: E,
1237    job_id: i64,
1238    run_lease: i64,
1239) -> Result<bool, AwaError>
1240where
1241    E: PgExecutor<'e>,
1242{
1243    let result = sqlx::query(
1244        r#"
1245        UPDATE awa.jobs
1246        SET callback_id = NULL,
1247            callback_timeout_at = NULL,
1248            callback_filter = NULL,
1249            callback_on_complete = NULL,
1250            callback_on_fail = NULL,
1251            callback_transform = NULL
1252        WHERE id = $1 AND callback_id IS NOT NULL AND state = 'running' AND run_lease = $2
1253        "#,
1254    )
1255    .bind(job_id)
1256    .bind(run_lease)
1257    .execute(executor)
1258    .await?;
1259
1260    Ok(result.rows_affected() > 0)
1261}
1262
1263// ── Sequential callback wait helpers ─────────────────────────────────
1264//
1265// These functions extract the DB-interaction logic for `wait_for_callback`
1266// so that both the Rust `JobContext` and the Python bridge call the same
1267// code paths.
1268
1269/// Result of a single poll iteration inside `wait_for_callback`.
1270#[derive(Debug)]
1271pub enum CallbackPollResult {
1272    /// The callback was resolved and the payload is ready.
1273    Resolved(serde_json::Value),
1274    /// Still waiting — caller should sleep and poll again.
1275    Pending,
1276    /// The callback token is stale (a different callback is current).
1277    Stale {
1278        token: Uuid,
1279        current: Uuid,
1280        state: JobState,
1281    },
1282    /// The job left the wait unexpectedly (rescued, cancelled, etc.).
1283    UnexpectedState { token: Uuid, state: JobState },
1284    /// The job was not found.
1285    NotFound,
1286}
1287
1288/// Transition a running job to `waiting_external` for the given callback.
1289///
1290/// Returns `Ok(true)` if the transition succeeded, `Ok(false)` if the row
1291/// did not match (the caller should check for an early-resume race).
1292pub async fn enter_callback_wait(
1293    pool: &PgPool,
1294    job_id: i64,
1295    run_lease: i64,
1296    callback_id: Uuid,
1297) -> Result<bool, AwaError> {
1298    let result = sqlx::query(
1299        r#"
1300        UPDATE awa.jobs
1301        SET state = 'waiting_external',
1302            heartbeat_at = NULL,
1303            deadline_at = NULL
1304        WHERE id = $1 AND state = 'running' AND run_lease = $2 AND callback_id = $3
1305        "#,
1306    )
1307    .bind(job_id)
1308    .bind(run_lease)
1309    .bind(callback_id)
1310    .execute(pool)
1311    .await?;
1312
1313    Ok(result.rows_affected() > 0)
1314}
1315
1316/// Check the current state of a job during callback wait.
1317///
1318/// Handles the early-resume race: if `resume_external` won the race before
1319/// the handler transitioned to `waiting_external`, the callback result is
1320/// already in metadata and this returns `Resolved`.
1321pub async fn check_callback_state(
1322    pool: &PgPool,
1323    job_id: i64,
1324    callback_id: Uuid,
1325) -> Result<CallbackPollResult, AwaError> {
1326    let row: Option<(JobState, Option<Uuid>, serde_json::Value)> =
1327        sqlx::query_as("SELECT state, callback_id, metadata FROM awa.jobs WHERE id = $1")
1328            .bind(job_id)
1329            .fetch_optional(pool)
1330            .await?;
1331
1332    match row {
1333        Some((JobState::Running, None, metadata))
1334            if metadata.get("_awa_callback_result").is_some() =>
1335        {
1336            let payload = take_callback_payload(pool, job_id, metadata).await?;
1337            Ok(CallbackPollResult::Resolved(payload))
1338        }
1339        Some((state, Some(current_callback_id), _)) if current_callback_id != callback_id => {
1340            Ok(CallbackPollResult::Stale {
1341                token: callback_id,
1342                current: current_callback_id,
1343                state,
1344            })
1345        }
1346        Some((JobState::WaitingExternal, Some(current), _)) if current == callback_id => {
1347            Ok(CallbackPollResult::Pending)
1348        }
1349        Some((state, _, _)) => Ok(CallbackPollResult::UnexpectedState {
1350            token: callback_id,
1351            state,
1352        }),
1353        None => Ok(CallbackPollResult::NotFound),
1354    }
1355}
1356
1357/// Extract the `_awa_callback_result` key from metadata and clean it up.
1358pub async fn take_callback_payload(
1359    pool: &PgPool,
1360    job_id: i64,
1361    metadata: serde_json::Value,
1362) -> Result<serde_json::Value, AwaError> {
1363    let payload = metadata
1364        .get("_awa_callback_result")
1365        .cloned()
1366        .unwrap_or(serde_json::Value::Null);
1367
1368    sqlx::query("UPDATE awa.jobs SET metadata = metadata - '_awa_callback_result' WHERE id = $1")
1369        .bind(job_id)
1370        .execute(pool)
1371        .await?;
1372
1373    Ok(payload)
1374}
1375
1376// ── CEL callback expressions ──────────────────────────────────────────
1377
1378/// Configuration for CEL callback expressions.
1379///
1380/// All fields are optional. When all are `None`, behaviour is identical to
1381/// the original `register_callback` (no expression evaluation).
1382#[derive(Debug, Clone, Default)]
1383pub struct CallbackConfig {
1384    /// Gate: should this payload be processed at all? Returns bool.
1385    pub filter: Option<String>,
1386    /// Does this payload indicate success? Returns bool.
1387    pub on_complete: Option<String>,
1388    /// Does this payload indicate failure? Returns bool. Evaluated before on_complete.
1389    pub on_fail: Option<String>,
1390    /// Reshape payload before returning to caller. Returns any Value.
1391    pub transform: Option<String>,
1392}
1393
1394impl CallbackConfig {
1395    /// Returns true if no expressions are configured.
1396    pub fn is_empty(&self) -> bool {
1397        self.filter.is_none()
1398            && self.on_complete.is_none()
1399            && self.on_fail.is_none()
1400            && self.transform.is_none()
1401    }
1402}
1403
1404/// What `resolve_callback` should do if no CEL conditions match or no
1405/// expressions are configured.
1406#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1407pub enum DefaultAction {
1408    Complete,
1409    Fail,
1410    Ignore,
1411}
1412
1413/// Outcome of `resolve_callback`.
1414#[derive(Debug)]
1415pub enum ResolveOutcome {
1416    Completed {
1417        payload: Option<serde_json::Value>,
1418        job: JobRow,
1419    },
1420    Failed {
1421        job: JobRow,
1422    },
1423    Ignored {
1424        reason: String,
1425    },
1426}
1427
1428impl ResolveOutcome {
1429    pub fn is_completed(&self) -> bool {
1430        matches!(self, ResolveOutcome::Completed { .. })
1431    }
1432    pub fn is_failed(&self) -> bool {
1433        matches!(self, ResolveOutcome::Failed { .. })
1434    }
1435    pub fn is_ignored(&self) -> bool {
1436        matches!(self, ResolveOutcome::Ignored { .. })
1437    }
1438}
1439
1440/// Register a callback with optional CEL expressions.
1441///
1442/// When expressions are provided and the `cel` feature is enabled, each
1443/// expression is trial-compiled at registration time so syntax errors are
1444/// caught early.
1445///
1446/// When the `cel` feature is disabled and any expression is non-None,
1447/// returns `AwaError::Validation`.
1448pub async fn register_callback_with_config<'e, E>(
1449    executor: E,
1450    job_id: i64,
1451    run_lease: i64,
1452    timeout: std::time::Duration,
1453    config: &CallbackConfig,
1454) -> Result<Uuid, AwaError>
1455where
1456    E: PgExecutor<'e>,
1457{
1458    // Validate CEL expressions at registration time: compile + check references
1459    #[cfg(feature = "cel")]
1460    {
1461        for (name, expr) in [
1462            ("filter", &config.filter),
1463            ("on_complete", &config.on_complete),
1464            ("on_fail", &config.on_fail),
1465            ("transform", &config.transform),
1466        ] {
1467            if let Some(src) = expr {
1468                let program = cel::Program::compile(src).map_err(|e| {
1469                    AwaError::Validation(format!("invalid CEL expression for {name}: {e}"))
1470                })?;
1471
1472                // Reject undeclared variables — CEL only reports these at execution
1473                // time, so an expression like `missing == 1` would parse fine but
1474                // silently fall into the fail-open path at resolve time.
1475                let refs = program.references();
1476                let bad_vars: Vec<&str> = refs
1477                    .variables()
1478                    .into_iter()
1479                    .filter(|v| *v != "payload")
1480                    .collect();
1481                if !bad_vars.is_empty() {
1482                    return Err(AwaError::Validation(format!(
1483                        "CEL expression for {name} references undeclared variable(s): {}; \
1484                         only 'payload' is available",
1485                        bad_vars.join(", ")
1486                    )));
1487                }
1488            }
1489        }
1490    }
1491
1492    #[cfg(not(feature = "cel"))]
1493    {
1494        if !config.is_empty() {
1495            return Err(AwaError::Validation(
1496                "CEL expressions require the 'cel' feature".into(),
1497            ));
1498        }
1499    }
1500
1501    let callback_id = Uuid::new_v4();
1502    let timeout_secs = timeout.as_secs_f64();
1503
1504    let result = sqlx::query(
1505        r#"UPDATE awa.jobs
1506           SET callback_id = $2,
1507               callback_timeout_at = now() + make_interval(secs => $3),
1508               callback_filter = $4,
1509               callback_on_complete = $5,
1510               callback_on_fail = $6,
1511               callback_transform = $7
1512           WHERE id = $1 AND state = 'running' AND run_lease = $8"#,
1513    )
1514    .bind(job_id)
1515    .bind(callback_id)
1516    .bind(timeout_secs)
1517    .bind(&config.filter)
1518    .bind(&config.on_complete)
1519    .bind(&config.on_fail)
1520    .bind(&config.transform)
1521    .bind(run_lease)
1522    .execute(executor)
1523    .await?;
1524
1525    if result.rows_affected() == 0 {
1526        return Err(AwaError::Validation("job is not in running state".into()));
1527    }
1528    Ok(callback_id)
1529}
1530
1531/// Internal action decided by CEL evaluation or default.
1532enum ResolveAction {
1533    Complete(Option<serde_json::Value>),
1534    Fail {
1535        error: String,
1536        expression: Option<String>,
1537    },
1538    Ignore(String),
1539}
1540
1541/// Resolve a callback by evaluating CEL expressions against the payload.
1542///
1543/// Uses a transaction with `SELECT ... FOR UPDATE` for atomicity.
1544/// The `default_action` determines behaviour when no CEL conditions match
1545/// or no expressions are configured.
1546pub async fn resolve_callback(
1547    pool: &PgPool,
1548    callback_id: Uuid,
1549    payload: Option<serde_json::Value>,
1550    default_action: DefaultAction,
1551    run_lease: Option<i64>,
1552) -> Result<ResolveOutcome, AwaError> {
1553    let mut tx = pool.begin().await?;
1554
1555    // Query jobs_hot directly (not the awa.jobs UNION ALL view) because
1556    // FOR UPDATE is not reliably supported on UNION views. Waiting_external
1557    // and running jobs are always in jobs_hot (the check constraint on
1558    // scheduled_jobs only allows scheduled/retryable).
1559    //
1560    // Accepts both 'waiting_external' and 'running' to handle the race where
1561    // a fast callback arrives before the executor transitions running ->
1562    // waiting_external (matching complete_external/fail_external behavior).
1563    let job = sqlx::query_as::<_, JobRow>(
1564        "SELECT * FROM awa.jobs_hot WHERE callback_id = $1
1565         AND state IN ('waiting_external', 'running')
1566         AND ($2::bigint IS NULL OR run_lease = $2)
1567         FOR UPDATE",
1568    )
1569    .bind(callback_id)
1570    .bind(run_lease)
1571    .fetch_optional(&mut *tx)
1572    .await?
1573    .ok_or(AwaError::CallbackNotFound {
1574        callback_id: callback_id.to_string(),
1575    })?;
1576
1577    let action = evaluate_or_default(&job, &payload, default_action)?;
1578
1579    match action {
1580        ResolveAction::Complete(transformed_payload) => {
1581            let completed_job = sqlx::query_as::<_, JobRow>(
1582                r#"
1583                UPDATE awa.jobs
1584                SET state = 'completed',
1585                    finalized_at = now(),
1586                    callback_id = NULL,
1587                    callback_timeout_at = NULL,
1588                    callback_filter = NULL,
1589                    callback_on_complete = NULL,
1590                    callback_on_fail = NULL,
1591                    callback_transform = NULL,
1592                    heartbeat_at = NULL,
1593                    deadline_at = NULL,
1594                    progress = NULL
1595                WHERE id = $1
1596                RETURNING *
1597                "#,
1598            )
1599            .bind(job.id)
1600            .fetch_one(&mut *tx)
1601            .await?;
1602
1603            tx.commit().await?;
1604            Ok(ResolveOutcome::Completed {
1605                payload: transformed_payload,
1606                job: completed_job,
1607            })
1608        }
1609        ResolveAction::Fail { error, expression } => {
1610            let mut error_json = serde_json::json!({
1611                "error": error,
1612                "attempt": job.attempt,
1613                "at": chrono::Utc::now().to_rfc3339(),
1614            });
1615            if let Some(expr) = expression {
1616                error_json["expression"] = serde_json::Value::String(expr);
1617            }
1618
1619            let failed_job = sqlx::query_as::<_, JobRow>(
1620                r#"
1621                UPDATE awa.jobs
1622                SET state = 'failed',
1623                    finalized_at = now(),
1624                    callback_id = NULL,
1625                    callback_timeout_at = NULL,
1626                    callback_filter = NULL,
1627                    callback_on_complete = NULL,
1628                    callback_on_fail = NULL,
1629                    callback_transform = NULL,
1630                    heartbeat_at = NULL,
1631                    deadline_at = NULL,
1632                    errors = errors || $2::jsonb
1633                WHERE id = $1
1634                RETURNING *
1635                "#,
1636            )
1637            .bind(job.id)
1638            .bind(error_json)
1639            .fetch_one(&mut *tx)
1640            .await?;
1641
1642            tx.commit().await?;
1643            Ok(ResolveOutcome::Failed { job: failed_job })
1644        }
1645        ResolveAction::Ignore(reason) => {
1646            // No state change — dropping tx releases FOR UPDATE lock
1647            Ok(ResolveOutcome::Ignored { reason })
1648        }
1649    }
1650}
1651
1652/// Evaluate CEL expressions or fall through to default_action.
1653fn evaluate_or_default(
1654    job: &JobRow,
1655    payload: &Option<serde_json::Value>,
1656    default_action: DefaultAction,
1657) -> Result<ResolveAction, AwaError> {
1658    let has_expressions = job.callback_filter.is_some()
1659        || job.callback_on_complete.is_some()
1660        || job.callback_on_fail.is_some()
1661        || job.callback_transform.is_some();
1662
1663    if !has_expressions {
1664        return Ok(apply_default(default_action, payload));
1665    }
1666
1667    #[cfg(feature = "cel")]
1668    {
1669        Ok(evaluate_cel(job, payload, default_action))
1670    }
1671
1672    #[cfg(not(feature = "cel"))]
1673    {
1674        // Expressions are present but CEL feature is not enabled.
1675        // Return an error without mutating the job — it stays in waiting_external.
1676        let _ = (payload, default_action);
1677        Err(AwaError::Validation(
1678            "CEL expressions present but 'cel' feature is not enabled".into(),
1679        ))
1680    }
1681}
1682
1683fn apply_default(
1684    default_action: DefaultAction,
1685    payload: &Option<serde_json::Value>,
1686) -> ResolveAction {
1687    match default_action {
1688        DefaultAction::Complete => ResolveAction::Complete(payload.clone()),
1689        DefaultAction::Fail => ResolveAction::Fail {
1690            error: "callback failed: default action".to_string(),
1691            expression: None,
1692        },
1693        DefaultAction::Ignore => {
1694            ResolveAction::Ignore("no expressions configured, default is ignore".to_string())
1695        }
1696    }
1697}
1698
1699#[cfg(feature = "cel")]
1700fn evaluate_cel(
1701    job: &JobRow,
1702    payload: &Option<serde_json::Value>,
1703    default_action: DefaultAction,
1704) -> ResolveAction {
1705    let payload_value = payload.as_ref().cloned().unwrap_or(serde_json::Value::Null);
1706
1707    // 1. Evaluate filter
1708    if let Some(filter_expr) = &job.callback_filter {
1709        match eval_bool(filter_expr, &payload_value, job.id, "filter") {
1710            Ok(true) => {} // pass through
1711            Ok(false) => {
1712                return ResolveAction::Ignore("filter expression returned false".to_string());
1713            }
1714            Err(_) => {
1715                // Fail-open: treat filter error as true (pass through)
1716            }
1717        }
1718    }
1719
1720    // 2. Evaluate on_fail (before on_complete — fail takes precedence)
1721    if let Some(on_fail_expr) = &job.callback_on_fail {
1722        match eval_bool(on_fail_expr, &payload_value, job.id, "on_fail") {
1723            Ok(true) => {
1724                return ResolveAction::Fail {
1725                    error: "callback failed: on_fail expression matched".to_string(),
1726                    expression: Some(on_fail_expr.clone()),
1727                };
1728            }
1729            Ok(false) => {} // don't fail
1730            Err(_) => {
1731                // Fail-open: treat on_fail error as false (don't fail)
1732            }
1733        }
1734    }
1735
1736    // 3. Evaluate on_complete
1737    if let Some(on_complete_expr) = &job.callback_on_complete {
1738        match eval_bool(on_complete_expr, &payload_value, job.id, "on_complete") {
1739            Ok(true) => {
1740                // Complete with optional transform
1741                let transformed = apply_transform(job, &payload_value);
1742                return ResolveAction::Complete(Some(transformed));
1743            }
1744            Ok(false) => {} // don't complete
1745            Err(_) => {
1746                // Fail-open: treat on_complete error as false (don't complete)
1747            }
1748        }
1749    }
1750
1751    // 4. Neither condition matched → apply default_action
1752    apply_default(default_action, payload)
1753}
1754
1755#[cfg(feature = "cel")]
1756fn eval_bool(
1757    expression: &str,
1758    payload_value: &serde_json::Value,
1759    job_id: i64,
1760    expression_name: &str,
1761) -> Result<bool, ()> {
1762    let program = match cel::Program::compile(expression) {
1763        Ok(p) => p,
1764        Err(e) => {
1765            tracing::warn!(
1766                job_id,
1767                expression_name,
1768                expression,
1769                error = %e,
1770                "CEL compilation error during evaluation"
1771            );
1772            return Err(());
1773        }
1774    };
1775
1776    let mut context = cel::Context::default();
1777    if let Err(e) = context.add_variable("payload", payload_value.clone()) {
1778        tracing::warn!(
1779            job_id,
1780            expression_name,
1781            error = %e,
1782            "Failed to add payload variable to CEL context"
1783        );
1784        return Err(());
1785    }
1786
1787    match program.execute(&context) {
1788        Ok(cel::Value::Bool(b)) => Ok(b),
1789        Ok(other) => {
1790            tracing::warn!(
1791                job_id,
1792                expression_name,
1793                expression,
1794                result_type = ?other.type_of(),
1795                "CEL expression returned non-bool"
1796            );
1797            Err(())
1798        }
1799        Err(e) => {
1800            tracing::warn!(
1801                job_id,
1802                expression_name,
1803                expression,
1804                error = %e,
1805                "CEL execution error"
1806            );
1807            Err(())
1808        }
1809    }
1810}
1811
1812#[cfg(feature = "cel")]
1813fn apply_transform(job: &JobRow, payload_value: &serde_json::Value) -> serde_json::Value {
1814    let transform_expr = match &job.callback_transform {
1815        Some(expr) => expr,
1816        None => return payload_value.clone(),
1817    };
1818
1819    let program = match cel::Program::compile(transform_expr) {
1820        Ok(p) => p,
1821        Err(e) => {
1822            tracing::warn!(
1823                job_id = job.id,
1824                expression = transform_expr,
1825                error = %e,
1826                "CEL transform compilation error, using original payload"
1827            );
1828            return payload_value.clone();
1829        }
1830    };
1831
1832    let mut context = cel::Context::default();
1833    if let Err(e) = context.add_variable("payload", payload_value.clone()) {
1834        tracing::warn!(
1835            job_id = job.id,
1836            error = %e,
1837            "Failed to add payload variable for transform"
1838        );
1839        return payload_value.clone();
1840    }
1841
1842    match program.execute(&context) {
1843        Ok(value) => match value.json() {
1844            Ok(json) => json,
1845            Err(e) => {
1846                tracing::warn!(
1847                    job_id = job.id,
1848                    expression = transform_expr,
1849                    error = %e,
1850                    "CEL transform result could not be converted to JSON, using original payload"
1851                );
1852                payload_value.clone()
1853            }
1854        },
1855        Err(e) => {
1856            tracing::warn!(
1857                job_id = job.id,
1858                expression = transform_expr,
1859                error = %e,
1860                "CEL transform execution error, using original payload"
1861            );
1862            payload_value.clone()
1863        }
1864    }
1865}