Skip to main content

awa_model/
admin.rs

1use crate::error::AwaError;
2use crate::job::{JobRow, JobState};
3use chrono::{DateTime, Duration, Utc};
4use serde::{Deserialize, Serialize};
5use sqlx::types::Json;
6use sqlx::PgExecutor;
7use sqlx::PgPool;
8use std::cmp::max;
9use std::collections::HashMap;
10use uuid::Uuid;
11
12/// Retry a single failed, cancelled, or waiting_external job.
13pub async fn retry<'e, E>(executor: E, job_id: i64) -> Result<Option<JobRow>, AwaError>
14where
15    E: PgExecutor<'e>,
16{
17    sqlx::query_as::<_, JobRow>(
18        r#"
19        UPDATE awa.jobs
20        SET state = 'available', attempt = 0, run_at = now(),
21            finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL,
22            callback_id = NULL, callback_timeout_at = NULL,
23            callback_filter = NULL, callback_on_complete = NULL,
24            callback_on_fail = NULL, callback_transform = NULL
25        WHERE id = $1 AND state IN ('failed', 'cancelled', 'waiting_external')
26        RETURNING *
27        "#,
28    )
29    .bind(job_id)
30    .fetch_optional(executor)
31    .await?
32    .ok_or(AwaError::JobNotFound { id: job_id })
33    .map(Some)
34}
35
36/// Cancel a single non-terminal job.
37pub async fn cancel<'e, E>(executor: E, job_id: i64) -> Result<Option<JobRow>, AwaError>
38where
39    E: PgExecutor<'e>,
40{
41    sqlx::query_as::<_, JobRow>(
42        r#"
43        UPDATE awa.jobs
44        SET state = 'cancelled', finalized_at = now(),
45            callback_id = NULL, callback_timeout_at = NULL,
46            callback_filter = NULL, callback_on_complete = NULL,
47            callback_on_fail = NULL, callback_transform = NULL
48        WHERE id = $1 AND state NOT IN ('completed', 'failed', 'cancelled')
49        RETURNING *
50        "#,
51    )
52    .bind(job_id)
53    .fetch_optional(executor)
54    .await?
55    .ok_or(AwaError::JobNotFound { id: job_id })
56    .map(Some)
57}
58
59/// Cancel a job by its unique key components.
60///
61/// Reconstructs the BLAKE3 unique key from the same inputs used at insert time
62/// (kind, optional queue, optional args, optional period bucket), then cancels
63/// the single oldest matching non-terminal job. Returns `None` if no matching
64/// job was found (already completed, already cancelled, or never existed).
65///
66/// The parameters must match what was used at insert time: pass `queue` only if
67/// the original `UniqueOpts` had `by_queue: true`, `args` only if `by_args: true`,
68/// and `period_bucket` only if `by_period` was set. Mismatched components produce
69/// a different hash and the job won't be found.
70///
71/// Only one job is cancelled per call (the oldest by `id`). This is intentional:
72/// unique key enforcement uses a state bitmask, so multiple rows with the same
73/// key can legally coexist (e.g., one `waiting_external` + one `available`).
74/// Cancelling all of them in one shot would be surprising.
75///
76/// This is useful when the caller knows the job kind and args but not the job ID —
77/// e.g., cancelling a scheduled reminder when the triggering condition is resolved.
78///
79/// # Implementation notes
80///
81/// Queries `jobs_hot` and `scheduled_jobs` directly rather than the `awa.jobs`
82/// UNION ALL view, because PostgreSQL does not support `FOR UPDATE` on UNION
83/// views. The CTE selects candidate IDs without row locks; blocking on a
84/// concurrently-locked row (e.g., one being processed by a worker) happens
85/// implicitly during the UPDATE phase via the writable view trigger. If the
86/// worker completes the job before the UPDATE acquires the lock, the state
87/// check (`NOT IN ('completed', 'failed', 'cancelled')`) causes the cancel
88/// to no-op and return `None`.
89///
90/// The lookup scans `unique_key` on both physical tables without a dedicated
91/// index. This is acceptable for low-volume use cases. For high-volume tables,
92/// consider adding a partial index on `unique_key WHERE unique_key IS NOT NULL`
93/// or routing through `job_unique_claims` (which is already indexed).
94pub async fn cancel_by_unique_key<'e, E>(
95    executor: E,
96    kind: &str,
97    queue: Option<&str>,
98    args: Option<&serde_json::Value>,
99    period_bucket: Option<i64>,
100) -> Result<Option<JobRow>, AwaError>
101where
102    E: PgExecutor<'e>,
103{
104    let unique_key = crate::unique::compute_unique_key(kind, queue, args, period_bucket);
105
106    // Find the oldest matching job across both physical tables. CTE selects
107    // candidate IDs without row locks; blocking on concurrently-locked rows
108    // happens implicitly during the UPDATE via the writable view trigger.
109    let row = sqlx::query_as::<_, JobRow>(
110        r#"
111        WITH candidates AS (
112            SELECT id FROM awa.jobs_hot
113            WHERE unique_key = $1 AND state NOT IN ('completed', 'failed', 'cancelled')
114            UNION ALL
115            SELECT id FROM awa.scheduled_jobs
116            WHERE unique_key = $1 AND state NOT IN ('completed', 'failed', 'cancelled')
117            ORDER BY id ASC
118            LIMIT 1
119        )
120        UPDATE awa.jobs
121        SET state = 'cancelled', finalized_at = now(),
122            callback_id = NULL, callback_timeout_at = NULL,
123            callback_filter = NULL, callback_on_complete = NULL,
124            callback_on_fail = NULL, callback_transform = NULL
125        FROM candidates
126        WHERE awa.jobs.id = candidates.id
127        RETURNING awa.jobs.*
128        "#,
129    )
130    .bind(&unique_key)
131    .fetch_optional(executor)
132    .await?;
133
134    Ok(row)
135}
136
137/// Retry all failed jobs of a given kind.
138pub async fn retry_failed_by_kind<'e, E>(executor: E, kind: &str) -> Result<Vec<JobRow>, AwaError>
139where
140    E: PgExecutor<'e>,
141{
142    let rows = sqlx::query_as::<_, JobRow>(
143        r#"
144        UPDATE awa.jobs
145        SET state = 'available', attempt = 0, run_at = now(),
146            finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL
147        WHERE kind = $1 AND state = 'failed'
148        RETURNING *
149        "#,
150    )
151    .bind(kind)
152    .fetch_all(executor)
153    .await?;
154
155    Ok(rows)
156}
157
158/// Retry all failed jobs in a given queue.
159pub async fn retry_failed_by_queue<'e, E>(executor: E, queue: &str) -> Result<Vec<JobRow>, AwaError>
160where
161    E: PgExecutor<'e>,
162{
163    let rows = sqlx::query_as::<_, JobRow>(
164        r#"
165        UPDATE awa.jobs
166        SET state = 'available', attempt = 0, run_at = now(),
167            finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL
168        WHERE queue = $1 AND state = 'failed'
169        RETURNING *
170        "#,
171    )
172    .bind(queue)
173    .fetch_all(executor)
174    .await?;
175
176    Ok(rows)
177}
178
179/// Discard (delete) all failed jobs of a given kind.
180pub async fn discard_failed<'e, E>(executor: E, kind: &str) -> Result<u64, AwaError>
181where
182    E: PgExecutor<'e>,
183{
184    let result = sqlx::query("DELETE FROM awa.jobs WHERE kind = $1 AND state = 'failed'")
185        .bind(kind)
186        .execute(executor)
187        .await?;
188
189    Ok(result.rows_affected())
190}
191
192/// Pause a queue. Affects all workers immediately.
193pub async fn pause_queue<'e, E>(
194    executor: E,
195    queue: &str,
196    paused_by: Option<&str>,
197) -> Result<(), AwaError>
198where
199    E: PgExecutor<'e>,
200{
201    sqlx::query(
202        r#"
203        INSERT INTO awa.queue_meta (queue, paused, paused_at, paused_by)
204        VALUES ($1, TRUE, now(), $2)
205        ON CONFLICT (queue) DO UPDATE SET paused = TRUE, paused_at = now(), paused_by = $2
206        "#,
207    )
208    .bind(queue)
209    .bind(paused_by)
210    .execute(executor)
211    .await?;
212
213    Ok(())
214}
215
216/// Resume a paused queue.
217pub async fn resume_queue<'e, E>(executor: E, queue: &str) -> Result<(), AwaError>
218where
219    E: PgExecutor<'e>,
220{
221    sqlx::query("UPDATE awa.queue_meta SET paused = FALSE WHERE queue = $1")
222        .bind(queue)
223        .execute(executor)
224        .await?;
225
226    Ok(())
227}
228
229/// Drain a queue: cancel all non-running, non-terminal jobs.
230pub async fn drain_queue<'e, E>(executor: E, queue: &str) -> Result<u64, AwaError>
231where
232    E: PgExecutor<'e>,
233{
234    let result = sqlx::query(
235        r#"
236        UPDATE awa.jobs
237        SET state = 'cancelled', finalized_at = now(),
238            callback_id = NULL, callback_timeout_at = NULL,
239            callback_filter = NULL, callback_on_complete = NULL,
240            callback_on_fail = NULL, callback_transform = NULL
241        WHERE queue = $1 AND state IN ('available', 'scheduled', 'retryable', 'waiting_external')
242        "#,
243    )
244    .bind(queue)
245    .execute(executor)
246    .await?;
247
248    Ok(result.rows_affected())
249}
250
251/// Queue statistics.
252#[derive(Debug, Clone, Serialize)]
253pub struct QueueStats {
254    pub queue: String,
255    /// All non-terminal jobs for the queue, including running and waiting_external.
256    pub total_queued: i64,
257    pub scheduled: i64,
258    pub available: i64,
259    pub retryable: i64,
260    pub running: i64,
261    pub failed: i64,
262    pub waiting_external: i64,
263    pub completed_last_hour: i64,
264    pub lag_seconds: Option<f64>,
265    pub paused: bool,
266}
267
268/// Snapshot of a per-queue rate limit configuration.
269#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
270pub struct RateLimitSnapshot {
271    pub max_rate: f64,
272    pub burst: u32,
273}
274
275/// Runtime concurrency mode for a queue.
276#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
277#[serde(rename_all = "snake_case")]
278pub enum QueueRuntimeMode {
279    HardReserved,
280    Weighted,
281}
282
283/// Per-queue configuration published by a worker runtime instance.
284#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
285pub struct QueueRuntimeConfigSnapshot {
286    pub mode: QueueRuntimeMode,
287    pub max_workers: Option<u32>,
288    pub min_workers: Option<u32>,
289    pub weight: Option<u32>,
290    pub global_max_workers: Option<u32>,
291    pub poll_interval_ms: u64,
292    pub deadline_duration_secs: u64,
293    pub priority_aging_interval_secs: u64,
294    pub rate_limit: Option<RateLimitSnapshot>,
295}
296
297/// Runtime state for one queue on one worker instance.
298#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
299pub struct QueueRuntimeSnapshot {
300    pub queue: String,
301    pub in_flight: u32,
302    pub overflow_held: Option<u32>,
303    pub config: QueueRuntimeConfigSnapshot,
304}
305
306/// Data written by a worker runtime into the observability snapshot table.
307#[derive(Debug, Clone, Serialize, Deserialize)]
308pub struct RuntimeSnapshotInput {
309    pub instance_id: Uuid,
310    pub hostname: Option<String>,
311    pub pid: i32,
312    pub version: String,
313    pub started_at: DateTime<Utc>,
314    pub snapshot_interval_ms: i64,
315    pub healthy: bool,
316    pub postgres_connected: bool,
317    pub poll_loop_alive: bool,
318    pub heartbeat_alive: bool,
319    pub maintenance_alive: bool,
320    pub shutting_down: bool,
321    pub leader: bool,
322    pub global_max_workers: Option<u32>,
323    pub queues: Vec<QueueRuntimeSnapshot>,
324}
325
326/// A worker runtime instance as exposed through the admin API.
327#[derive(Debug, Clone, Serialize, Deserialize)]
328pub struct RuntimeInstance {
329    pub instance_id: Uuid,
330    pub hostname: Option<String>,
331    pub pid: i32,
332    pub version: String,
333    pub started_at: DateTime<Utc>,
334    pub last_seen_at: DateTime<Utc>,
335    pub snapshot_interval_ms: i64,
336    pub stale: bool,
337    pub healthy: bool,
338    pub postgres_connected: bool,
339    pub poll_loop_alive: bool,
340    pub heartbeat_alive: bool,
341    pub maintenance_alive: bool,
342    pub shutting_down: bool,
343    pub leader: bool,
344    pub global_max_workers: Option<u32>,
345    pub queues: Vec<QueueRuntimeSnapshot>,
346}
347
348impl RuntimeInstance {
349    fn stale_cutoff(interval_ms: i64) -> Duration {
350        let interval_ms = max(interval_ms, 1_000);
351        Duration::milliseconds(max(interval_ms.saturating_mul(3), 30_000))
352    }
353
354    fn from_db_row(row: RuntimeInstanceRow, now: DateTime<Utc>) -> Self {
355        let stale = row.last_seen_at + Self::stale_cutoff(row.snapshot_interval_ms) < now;
356        Self {
357            instance_id: row.instance_id,
358            hostname: row.hostname,
359            pid: row.pid,
360            version: row.version,
361            started_at: row.started_at,
362            last_seen_at: row.last_seen_at,
363            snapshot_interval_ms: row.snapshot_interval_ms,
364            stale,
365            healthy: row.healthy,
366            postgres_connected: row.postgres_connected,
367            poll_loop_alive: row.poll_loop_alive,
368            heartbeat_alive: row.heartbeat_alive,
369            maintenance_alive: row.maintenance_alive,
370            shutting_down: row.shutting_down,
371            leader: row.leader,
372            global_max_workers: row.global_max_workers.map(|v| v as u32),
373            queues: row.queues.0,
374        }
375    }
376}
377
378/// Cluster-wide runtime overview.
379#[derive(Debug, Clone, Serialize, Deserialize)]
380pub struct RuntimeOverview {
381    pub total_instances: usize,
382    pub live_instances: usize,
383    pub stale_instances: usize,
384    pub healthy_instances: usize,
385    pub leader_instances: usize,
386    pub instances: Vec<RuntimeInstance>,
387}
388
389/// Queue-centric runtime/config summary aggregated across worker instances.
390#[derive(Debug, Clone, Serialize, Deserialize)]
391pub struct QueueRuntimeSummary {
392    pub queue: String,
393    pub instance_count: usize,
394    pub live_instances: usize,
395    pub stale_instances: usize,
396    pub healthy_instances: usize,
397    pub total_in_flight: u64,
398    pub overflow_held_total: Option<u64>,
399    pub config_mismatch: bool,
400    pub config: Option<QueueRuntimeConfigSnapshot>,
401}
402
403#[derive(Debug, sqlx::FromRow)]
404struct RuntimeInstanceRow {
405    instance_id: Uuid,
406    hostname: Option<String>,
407    pid: i32,
408    version: String,
409    started_at: DateTime<Utc>,
410    last_seen_at: DateTime<Utc>,
411    snapshot_interval_ms: i64,
412    healthy: bool,
413    postgres_connected: bool,
414    poll_loop_alive: bool,
415    heartbeat_alive: bool,
416    maintenance_alive: bool,
417    shutting_down: bool,
418    leader: bool,
419    global_max_workers: Option<i32>,
420    queues: Json<Vec<QueueRuntimeSnapshot>>,
421}
422
423/// Upsert a runtime observability snapshot for one worker instance.
424pub async fn upsert_runtime_snapshot<'e, E>(
425    executor: E,
426    snapshot: &RuntimeSnapshotInput,
427) -> Result<(), AwaError>
428where
429    E: PgExecutor<'e>,
430{
431    sqlx::query(
432        r#"
433        INSERT INTO awa.runtime_instances (
434            instance_id,
435            hostname,
436            pid,
437            version,
438            started_at,
439            last_seen_at,
440            snapshot_interval_ms,
441            healthy,
442            postgres_connected,
443            poll_loop_alive,
444            heartbeat_alive,
445            maintenance_alive,
446            shutting_down,
447            leader,
448            global_max_workers,
449            queues
450        )
451        VALUES (
452            $1, $2, $3, $4, $5, now(), $6, $7, $8, $9, $10, $11, $12, $13, $14, $15
453        )
454        ON CONFLICT (instance_id) DO UPDATE SET
455            hostname = EXCLUDED.hostname,
456            pid = EXCLUDED.pid,
457            version = EXCLUDED.version,
458            started_at = EXCLUDED.started_at,
459            last_seen_at = now(),
460            snapshot_interval_ms = EXCLUDED.snapshot_interval_ms,
461            healthy = EXCLUDED.healthy,
462            postgres_connected = EXCLUDED.postgres_connected,
463            poll_loop_alive = EXCLUDED.poll_loop_alive,
464            heartbeat_alive = EXCLUDED.heartbeat_alive,
465            maintenance_alive = EXCLUDED.maintenance_alive,
466            shutting_down = EXCLUDED.shutting_down,
467            leader = EXCLUDED.leader,
468            global_max_workers = EXCLUDED.global_max_workers,
469            queues = EXCLUDED.queues
470        "#,
471    )
472    .bind(snapshot.instance_id)
473    .bind(snapshot.hostname.as_deref())
474    .bind(snapshot.pid)
475    .bind(&snapshot.version)
476    .bind(snapshot.started_at)
477    .bind(snapshot.snapshot_interval_ms)
478    .bind(snapshot.healthy)
479    .bind(snapshot.postgres_connected)
480    .bind(snapshot.poll_loop_alive)
481    .bind(snapshot.heartbeat_alive)
482    .bind(snapshot.maintenance_alive)
483    .bind(snapshot.shutting_down)
484    .bind(snapshot.leader)
485    .bind(snapshot.global_max_workers.map(|v| v as i32))
486    .bind(Json(&snapshot.queues))
487    .execute(executor)
488    .await?;
489
490    Ok(())
491}
492
493/// Opportunistically delete long-stale runtime snapshot rows.
494pub async fn cleanup_runtime_snapshots<'e, E>(
495    executor: E,
496    max_age: Duration,
497) -> Result<u64, AwaError>
498where
499    E: PgExecutor<'e>,
500{
501    let seconds = max(max_age.num_seconds(), 1);
502    let result = sqlx::query(
503        "DELETE FROM awa.runtime_instances WHERE last_seen_at < now() - make_interval(secs => $1)",
504    )
505    .bind(seconds)
506    .execute(executor)
507    .await?;
508
509    Ok(result.rows_affected())
510}
511
512/// List all runtime instances ordered with leader/live instances first.
513pub async fn list_runtime_instances<'e, E>(executor: E) -> Result<Vec<RuntimeInstance>, AwaError>
514where
515    E: PgExecutor<'e>,
516{
517    let rows = sqlx::query_as::<_, RuntimeInstanceRow>(
518        r#"
519        SELECT
520            instance_id,
521            hostname,
522            pid,
523            version,
524            started_at,
525            last_seen_at,
526            snapshot_interval_ms,
527            healthy,
528            postgres_connected,
529            poll_loop_alive,
530            heartbeat_alive,
531            maintenance_alive,
532            shutting_down,
533            leader,
534            global_max_workers,
535            queues
536        FROM awa.runtime_instances
537        ORDER BY leader DESC, last_seen_at DESC, started_at DESC
538        "#,
539    )
540    .fetch_all(executor)
541    .await?;
542
543    let now = Utc::now();
544    Ok(rows
545        .into_iter()
546        .map(|row| RuntimeInstance::from_db_row(row, now))
547        .collect())
548}
549
550/// Cluster runtime overview with instance list.
551pub async fn runtime_overview<'e, E>(executor: E) -> Result<RuntimeOverview, AwaError>
552where
553    E: PgExecutor<'e>,
554{
555    let instances = list_runtime_instances(executor).await?;
556    let total_instances = instances.len();
557    let stale_instances = instances.iter().filter(|i| i.stale).count();
558    let live_instances = total_instances.saturating_sub(stale_instances);
559    let healthy_instances = instances.iter().filter(|i| !i.stale && i.healthy).count();
560    let leader_instances = instances.iter().filter(|i| !i.stale && i.leader).count();
561
562    Ok(RuntimeOverview {
563        total_instances,
564        live_instances,
565        stale_instances,
566        healthy_instances,
567        leader_instances,
568        instances,
569    })
570}
571
572/// Queue runtime/config summary aggregated across worker snapshots.
573pub async fn queue_runtime_summary<'e, E>(executor: E) -> Result<Vec<QueueRuntimeSummary>, AwaError>
574where
575    E: PgExecutor<'e>,
576{
577    let instances = list_runtime_instances(executor).await?;
578    let mut by_queue: HashMap<String, Vec<(bool, bool, QueueRuntimeSnapshot)>> = HashMap::new();
579
580    for instance in instances {
581        let is_live = !instance.stale;
582        let is_healthy = is_live && instance.healthy;
583        for queue in instance.queues {
584            by_queue
585                .entry(queue.queue.clone())
586                .or_default()
587                .push((is_live, is_healthy, queue));
588        }
589    }
590
591    let mut summaries: Vec<_> = by_queue
592        .into_iter()
593        .map(|(queue, entries)| {
594            let instance_count = entries.len();
595            let live_instances = entries.iter().filter(|(live, _, _)| *live).count();
596            let stale_instances = instance_count.saturating_sub(live_instances);
597            let healthy_instances = entries.iter().filter(|(_, healthy, _)| *healthy).count();
598            let total_in_flight = entries
599                .iter()
600                .filter(|(live, _, _)| *live)
601                .map(|(_, _, queue)| u64::from(queue.in_flight))
602                .sum();
603
604            let overflow_total: u64 = entries
605                .iter()
606                .filter(|(live, _, _)| *live)
607                .filter_map(|(_, _, queue)| queue.overflow_held.map(u64::from))
608                .sum();
609
610            let live_configs: Vec<_> = entries
611                .iter()
612                .filter(|(live, _, _)| *live)
613                .map(|(_, _, queue)| queue.config.clone())
614                .collect();
615            let config_candidates = if live_configs.is_empty() {
616                entries
617                    .iter()
618                    .map(|(_, _, queue)| queue.config.clone())
619                    .collect::<Vec<_>>()
620            } else {
621                live_configs
622            };
623            let config = config_candidates.first().cloned();
624            let config_mismatch = config_candidates
625                .iter()
626                .skip(1)
627                .any(|candidate| Some(candidate) != config.as_ref());
628
629            QueueRuntimeSummary {
630                queue,
631                instance_count,
632                live_instances,
633                stale_instances,
634                healthy_instances,
635                total_in_flight,
636                overflow_held_total: config
637                    .as_ref()
638                    .filter(|cfg| cfg.mode == QueueRuntimeMode::Weighted)
639                    .map(|_| overflow_total),
640                config_mismatch,
641                config,
642            }
643        })
644        .collect();
645
646    summaries.sort_by(|a, b| a.queue.cmp(&b.queue));
647    Ok(summaries)
648}
649
650/// Get statistics for all queues.
651///
652/// Hybrid read: per-state counts come from the `queue_state_counts`
653/// cache table (eventually consistent, ~2s lag), while `lag_seconds`
654/// and `completed_last_hour` are computed live from `jobs_hot`.
655///
656/// The cache is kept fresh by the maintenance leader's dirty-key
657/// recompute (~2s) and full reconciliation (~60s). Also warmed during
658/// `migrate()`.
659///
660/// For exact cached counts in tests without a running maintenance
661/// leader, call `flush_dirty_admin_metadata()` first.
662pub async fn queue_stats<'e, E>(executor: E) -> Result<Vec<QueueStats>, AwaError>
663where
664    E: PgExecutor<'e>,
665{
666    let rows = sqlx::query_as::<
667        _,
668        (
669            String,
670            i64,
671            i64,
672            i64,
673            i64,
674            i64,
675            i64,
676            i64,
677            i64,
678            Option<f64>,
679            bool,
680        ),
681    >(
682        r#"
683        WITH available_lag AS (
684            SELECT
685                queue,
686                EXTRACT(EPOCH FROM (now() - min(run_at)))::float8 AS lag_seconds
687            FROM awa.jobs_hot
688            WHERE state = 'available'
689            GROUP BY queue
690        ),
691        completed_recent AS (
692            SELECT
693                queue,
694                count(*)::bigint AS completed_last_hour
695            FROM awa.jobs_hot
696            WHERE state = 'completed'
697              AND finalized_at > now() - interval '1 hour'
698            GROUP BY queue
699        )
700        SELECT
701            qs.queue,
702            qs.scheduled + qs.available + qs.running + qs.retryable + qs.waiting_external AS total_queued,
703            qs.scheduled,
704            qs.available,
705            qs.retryable,
706            qs.running,
707            qs.failed,
708            qs.waiting_external,
709            COALESCE(cr.completed_last_hour, 0) AS completed_last_hour,
710            al.lag_seconds,
711            COALESCE(qm.paused, FALSE) AS paused
712        FROM awa.queue_state_counts qs
713        LEFT JOIN available_lag al ON al.queue = qs.queue
714        LEFT JOIN completed_recent cr ON cr.queue = qs.queue
715        LEFT JOIN awa.queue_meta qm ON qm.queue = qs.queue
716        ORDER BY qs.queue
717        "#,
718    )
719    .fetch_all(executor)
720    .await?;
721
722    Ok(rows
723        .into_iter()
724        .map(
725            |(
726                queue,
727                total_queued,
728                scheduled,
729                available,
730                retryable,
731                running,
732                failed,
733                waiting_external,
734                completed_last_hour,
735                lag_seconds,
736                paused,
737            )| QueueStats {
738                queue,
739                total_queued,
740                scheduled,
741                available,
742                retryable,
743                running,
744                failed,
745                waiting_external,
746                completed_last_hour,
747                lag_seconds,
748                paused,
749            },
750        )
751        .collect())
752}
753
754/// List jobs with optional filters.
755#[derive(Debug, Clone, Default, Serialize)]
756pub struct ListJobsFilter {
757    pub state: Option<JobState>,
758    pub kind: Option<String>,
759    pub queue: Option<String>,
760    pub tag: Option<String>,
761    pub before_id: Option<i64>,
762    pub limit: Option<i64>,
763}
764
765/// List jobs matching the given filter.
766pub async fn list_jobs<'e, E>(executor: E, filter: &ListJobsFilter) -> Result<Vec<JobRow>, AwaError>
767where
768    E: PgExecutor<'e>,
769{
770    let limit = filter.limit.unwrap_or(100);
771
772    let rows = sqlx::query_as::<_, JobRow>(
773        r#"
774        SELECT * FROM awa.jobs
775        WHERE ($1::awa.job_state IS NULL OR state = $1)
776          AND ($2::text IS NULL OR kind = $2)
777          AND ($3::text IS NULL OR queue = $3)
778          AND ($4::text IS NULL OR tags @> ARRAY[$4]::text[])
779          AND ($5::bigint IS NULL OR id < $5)
780        ORDER BY id DESC
781        LIMIT $6
782        "#,
783    )
784    .bind(filter.state)
785    .bind(&filter.kind)
786    .bind(&filter.queue)
787    .bind(&filter.tag)
788    .bind(filter.before_id)
789    .bind(limit)
790    .fetch_all(executor)
791    .await?;
792
793    Ok(rows)
794}
795
796/// Get a single job by ID.
797pub async fn get_job<'e, E>(executor: E, job_id: i64) -> Result<JobRow, AwaError>
798where
799    E: PgExecutor<'e>,
800{
801    let row = sqlx::query_as::<_, JobRow>("SELECT * FROM awa.jobs WHERE id = $1")
802        .bind(job_id)
803        .fetch_optional(executor)
804        .await?;
805
806    row.ok_or(AwaError::JobNotFound { id: job_id })
807}
808
809/// Count jobs grouped by state.
810///
811/// Reads from the `queue_state_counts` cache table.
812pub async fn state_counts<'e, E>(executor: E) -> Result<HashMap<JobState, i64>, AwaError>
813where
814    E: PgExecutor<'e>,
815{
816    // Single scan of queue_state_counts — sums all columns in one pass
817    // then unpivots via VALUES join.
818    let rows = sqlx::query_as::<_, (JobState, i64)>(
819        r#"
820        SELECT v.state, v.total FROM (
821            SELECT
822                COALESCE(sum(scheduled), 0)::bigint      AS scheduled,
823                COALESCE(sum(available), 0)::bigint      AS available,
824                COALESCE(sum(running), 0)::bigint        AS running,
825                COALESCE(sum(completed), 0)::bigint      AS completed,
826                COALESCE(sum(retryable), 0)::bigint      AS retryable,
827                COALESCE(sum(failed), 0)::bigint         AS failed,
828                COALESCE(sum(cancelled), 0)::bigint      AS cancelled,
829                COALESCE(sum(waiting_external), 0)::bigint AS waiting_external
830            FROM awa.queue_state_counts
831        ) s,
832        LATERAL (VALUES
833            ('scheduled'::awa.job_state,        s.scheduled),
834            ('available'::awa.job_state,        s.available),
835            ('running'::awa.job_state,          s.running),
836            ('completed'::awa.job_state,        s.completed),
837            ('retryable'::awa.job_state,        s.retryable),
838            ('failed'::awa.job_state,           s.failed),
839            ('cancelled'::awa.job_state,        s.cancelled),
840            ('waiting_external'::awa.job_state, s.waiting_external)
841        ) AS v(state, total)
842        "#,
843    )
844    .fetch_all(executor)
845    .await?;
846
847    Ok(rows.into_iter().collect())
848}
849
850/// Return all distinct job kinds.
851///
852/// Reads from the `job_kind_catalog` cache table.
853pub async fn distinct_kinds<'e, E>(executor: E) -> Result<Vec<String>, AwaError>
854where
855    E: PgExecutor<'e>,
856{
857    let rows = sqlx::query_scalar::<_, String>(
858        "SELECT kind FROM awa.job_kind_catalog WHERE ref_count > 0 ORDER BY kind",
859    )
860    .fetch_all(executor)
861    .await?;
862
863    Ok(rows)
864}
865
866/// Return all distinct queue names.
867///
868/// Reads from the `job_queue_catalog` cache table.
869pub async fn distinct_queues<'e, E>(executor: E) -> Result<Vec<String>, AwaError>
870where
871    E: PgExecutor<'e>,
872{
873    let rows = sqlx::query_scalar::<_, String>(
874        "SELECT queue FROM awa.job_queue_catalog WHERE ref_count > 0 ORDER BY queue",
875    )
876    .fetch_all(executor)
877    .await?;
878
879    Ok(rows)
880}
881
882/// Drain one batch of dirty keys and recompute exact cached rows.
883/// Returns the number of dirty keys processed in this batch.
884///
885/// Called frequently by the maintenance leader (~2s). Uses per-queue
886/// indexes for targeted recompute rather than full table scans.
887pub async fn recompute_dirty_admin_metadata(pool: &PgPool) -> Result<i32, AwaError> {
888    let count: i32 = sqlx::query_scalar("SELECT awa.recompute_dirty_admin_metadata(100)")
889        .fetch_one(pool)
890        .await?;
891    Ok(count)
892}
893
894/// Drain ALL dirty keys until the backlog is empty.
895///
896/// Use in tests or admin tooling where you need the cache to be fully
897/// consistent before reading. Each call to the underlying SQL function
898/// acquires a blocking advisory lock, so concurrent callers serialize
899/// rather than skip.
900pub async fn flush_dirty_admin_metadata(pool: &PgPool) -> Result<i32, AwaError> {
901    let mut total = 0i32;
902    loop {
903        let count: i32 = sqlx::query_scalar("SELECT awa.recompute_dirty_admin_metadata(100)")
904            .fetch_one(pool)
905            .await?;
906        total += count;
907        if count == 0 {
908            break;
909        }
910    }
911    Ok(total)
912}
913
914/// Full reconciliation of admin metadata counters from base tables.
915///
916/// Called infrequently by the maintenance leader (~60s) as a safety net
917/// to correct any drift from skipped dirty keys. Also called during
918/// migrate() to warm the cache.
919pub async fn refresh_admin_metadata(pool: &PgPool) -> Result<(), AwaError> {
920    sqlx::query("SELECT awa.refresh_admin_metadata()")
921        .execute(pool)
922        .await?;
923    Ok(())
924}
925
926/// Retry multiple jobs by ID. Only retries failed, cancelled, or waiting_external jobs.
927pub async fn bulk_retry<'e, E>(executor: E, ids: &[i64]) -> Result<Vec<JobRow>, AwaError>
928where
929    E: PgExecutor<'e>,
930{
931    let rows = sqlx::query_as::<_, JobRow>(
932        r#"
933        UPDATE awa.jobs
934        SET state = 'available', attempt = 0, run_at = now(),
935            finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL,
936            callback_id = NULL, callback_timeout_at = NULL,
937            callback_filter = NULL, callback_on_complete = NULL,
938            callback_on_fail = NULL, callback_transform = NULL
939        WHERE id = ANY($1) AND state IN ('failed', 'cancelled', 'waiting_external')
940        RETURNING *
941        "#,
942    )
943    .bind(ids)
944    .fetch_all(executor)
945    .await?;
946
947    Ok(rows)
948}
949
950/// Cancel multiple jobs by ID. Only cancels non-terminal jobs.
951pub async fn bulk_cancel<'e, E>(executor: E, ids: &[i64]) -> Result<Vec<JobRow>, AwaError>
952where
953    E: PgExecutor<'e>,
954{
955    let rows = sqlx::query_as::<_, JobRow>(
956        r#"
957        UPDATE awa.jobs
958        SET state = 'cancelled', finalized_at = now(),
959            callback_id = NULL, callback_timeout_at = NULL,
960            callback_filter = NULL, callback_on_complete = NULL,
961            callback_on_fail = NULL, callback_transform = NULL
962        WHERE id = ANY($1) AND state NOT IN ('completed', 'failed', 'cancelled')
963        RETURNING *
964        "#,
965    )
966    .bind(ids)
967    .fetch_all(executor)
968    .await?;
969
970    Ok(rows)
971}
972
973/// A bucketed count of jobs by state over time.
974#[derive(Debug, Clone, Serialize)]
975pub struct StateTimeseriesBucket {
976    pub bucket: chrono::DateTime<chrono::Utc>,
977    pub state: JobState,
978    pub count: i64,
979}
980
981/// Return time-bucketed state counts over the last N minutes.
982pub async fn state_timeseries<'e, E>(
983    executor: E,
984    minutes: i32,
985) -> Result<Vec<StateTimeseriesBucket>, AwaError>
986where
987    E: PgExecutor<'e>,
988{
989    let rows = sqlx::query_as::<_, (chrono::DateTime<chrono::Utc>, JobState, i64)>(
990        r#"
991        SELECT
992            date_trunc('minute', created_at) AS bucket,
993            state,
994            count(*) AS count
995        FROM awa.jobs
996        WHERE created_at >= now() - make_interval(mins => $1)
997        GROUP BY bucket, state
998        ORDER BY bucket
999        "#,
1000    )
1001    .bind(minutes)
1002    .fetch_all(executor)
1003    .await?;
1004
1005    Ok(rows
1006        .into_iter()
1007        .map(|(bucket, state, count)| StateTimeseriesBucket {
1008            bucket,
1009            state,
1010            count,
1011        })
1012        .collect())
1013}
1014
1015/// Register a callback for a running job, writing the callback_id and timeout
1016/// to the database immediately.
1017///
1018/// Call this BEFORE sending the callback_id to the external system to avoid
1019/// the race condition where the external system fires before the DB knows
1020/// about the callback.
1021///
1022/// Returns the generated callback UUID on success.
1023pub async fn register_callback<'e, E>(
1024    executor: E,
1025    job_id: i64,
1026    run_lease: i64,
1027    timeout: std::time::Duration,
1028) -> Result<Uuid, AwaError>
1029where
1030    E: PgExecutor<'e>,
1031{
1032    let callback_id = Uuid::new_v4();
1033    let timeout_secs = timeout.as_secs_f64();
1034    let result = sqlx::query(
1035        r#"UPDATE awa.jobs
1036           SET callback_id = $2,
1037               callback_timeout_at = now() + make_interval(secs => $3),
1038               callback_filter = NULL,
1039               callback_on_complete = NULL,
1040               callback_on_fail = NULL,
1041               callback_transform = NULL
1042           WHERE id = $1 AND state = 'running' AND run_lease = $4"#,
1043    )
1044    .bind(job_id)
1045    .bind(callback_id)
1046    .bind(timeout_secs)
1047    .bind(run_lease)
1048    .execute(executor)
1049    .await?;
1050    if result.rows_affected() == 0 {
1051        return Err(AwaError::Validation("job is not in running state".into()));
1052    }
1053    Ok(callback_id)
1054}
1055
1056/// Complete a waiting job via external callback.
1057///
1058/// Accepts jobs in `waiting_external` or `running` state (race handling: the
1059/// external system may fire before the executor transitions to `waiting_external`).
1060///
1061/// When `resume` is `false` (default), the job transitions to `completed`.
1062/// When `resume` is `true`, the job transitions back to `running` with the
1063/// callback payload stored in metadata under `_awa_callback_result`. The
1064/// handler can then read the result and continue processing (sequential
1065/// callback pattern from ADR-016).
1066pub async fn complete_external<'e, E>(
1067    executor: E,
1068    callback_id: Uuid,
1069    payload: Option<serde_json::Value>,
1070    run_lease: Option<i64>,
1071) -> Result<JobRow, AwaError>
1072where
1073    E: PgExecutor<'e>,
1074{
1075    complete_external_inner(executor, callback_id, payload, run_lease, false).await
1076}
1077
1078/// Complete a waiting job and resume the handler with the callback payload.
1079///
1080/// Like `complete_external`, but the job transitions to `running` instead of
1081/// `completed`, allowing the handler to continue with sequential callbacks.
1082/// The payload is stored in `metadata._awa_callback_result`.
1083pub async fn resume_external<'e, E>(
1084    executor: E,
1085    callback_id: Uuid,
1086    payload: Option<serde_json::Value>,
1087    run_lease: Option<i64>,
1088) -> Result<JobRow, AwaError>
1089where
1090    E: PgExecutor<'e>,
1091{
1092    complete_external_inner(executor, callback_id, payload, run_lease, true).await
1093}
1094
1095async fn complete_external_inner<'e, E>(
1096    executor: E,
1097    callback_id: Uuid,
1098    payload: Option<serde_json::Value>,
1099    run_lease: Option<i64>,
1100    resume: bool,
1101) -> Result<JobRow, AwaError>
1102where
1103    E: PgExecutor<'e>,
1104{
1105    let row = if resume {
1106        // Resume: transition to running, store payload, refresh heartbeat.
1107        // The handler is still alive and polling — it will detect the state change.
1108        let payload_json = payload.unwrap_or(serde_json::Value::Null);
1109        sqlx::query_as::<_, JobRow>(
1110            r#"
1111            UPDATE awa.jobs
1112            SET state = 'running',
1113                callback_id = NULL,
1114                callback_timeout_at = NULL,
1115                callback_filter = NULL,
1116                callback_on_complete = NULL,
1117                callback_on_fail = NULL,
1118                callback_transform = NULL,
1119                heartbeat_at = now(),
1120                metadata = metadata || jsonb_build_object('_awa_callback_result', $3::jsonb)
1121            WHERE callback_id = $1 AND state IN ('waiting_external', 'running')
1122              AND ($2::bigint IS NULL OR run_lease = $2)
1123            RETURNING *
1124            "#,
1125        )
1126        .bind(callback_id)
1127        .bind(run_lease)
1128        .bind(&payload_json)
1129        .fetch_optional(executor)
1130        .await?
1131    } else {
1132        // Complete: terminal state, clear everything.
1133        sqlx::query_as::<_, JobRow>(
1134            r#"
1135            UPDATE awa.jobs
1136            SET state = 'completed',
1137                finalized_at = now(),
1138                callback_id = NULL,
1139                callback_timeout_at = NULL,
1140                callback_filter = NULL,
1141                callback_on_complete = NULL,
1142                callback_on_fail = NULL,
1143                callback_transform = NULL,
1144                heartbeat_at = NULL,
1145                deadline_at = NULL,
1146                progress = NULL
1147            WHERE callback_id = $1 AND state IN ('waiting_external', 'running')
1148              AND ($2::bigint IS NULL OR run_lease = $2)
1149            RETURNING *
1150            "#,
1151        )
1152        .bind(callback_id)
1153        .bind(run_lease)
1154        .fetch_optional(executor)
1155        .await?
1156    };
1157
1158    row.ok_or(AwaError::CallbackNotFound {
1159        callback_id: callback_id.to_string(),
1160    })
1161}
1162
1163/// Fail a waiting job via external callback.
1164///
1165/// Records the error and transitions to `failed`.
1166pub async fn fail_external<'e, E>(
1167    executor: E,
1168    callback_id: Uuid,
1169    error: &str,
1170    run_lease: Option<i64>,
1171) -> Result<JobRow, AwaError>
1172where
1173    E: PgExecutor<'e>,
1174{
1175    let row = sqlx::query_as::<_, JobRow>(
1176        r#"
1177        UPDATE awa.jobs
1178        SET state = 'failed',
1179            finalized_at = now(),
1180            callback_id = NULL,
1181            callback_timeout_at = NULL,
1182            callback_filter = NULL,
1183            callback_on_complete = NULL,
1184            callback_on_fail = NULL,
1185            callback_transform = NULL,
1186            heartbeat_at = NULL,
1187            deadline_at = NULL,
1188            errors = errors || jsonb_build_object(
1189                'error', $2::text,
1190                'attempt', attempt,
1191                'at', now()
1192            )::jsonb
1193        WHERE callback_id = $1 AND state IN ('waiting_external', 'running')
1194          AND ($3::bigint IS NULL OR run_lease = $3)
1195        RETURNING *
1196        "#,
1197    )
1198    .bind(callback_id)
1199    .bind(error)
1200    .bind(run_lease)
1201    .fetch_optional(executor)
1202    .await?;
1203
1204    row.ok_or(AwaError::CallbackNotFound {
1205        callback_id: callback_id.to_string(),
1206    })
1207}
1208
1209/// Retry a waiting job via external callback.
1210///
1211/// Resets to `available` with attempt = 0. The handler must be idempotent
1212/// with respect to the external call — a retry re-executes from scratch.
1213///
1214/// Only accepts `waiting_external` state — unlike complete/fail which are
1215/// terminal transitions, retry puts the job back to `available`. Allowing
1216/// retry from `running` would risk concurrent dispatch if the original
1217/// handler hasn't finished yet.
1218pub async fn retry_external<'e, E>(
1219    executor: E,
1220    callback_id: Uuid,
1221    run_lease: Option<i64>,
1222) -> Result<JobRow, AwaError>
1223where
1224    E: PgExecutor<'e>,
1225{
1226    let row = sqlx::query_as::<_, JobRow>(
1227        r#"
1228        UPDATE awa.jobs
1229        SET state = 'available',
1230            attempt = 0,
1231            run_at = now(),
1232            finalized_at = NULL,
1233            callback_id = NULL,
1234            callback_timeout_at = NULL,
1235            callback_filter = NULL,
1236            callback_on_complete = NULL,
1237            callback_on_fail = NULL,
1238            callback_transform = NULL,
1239            heartbeat_at = NULL,
1240            deadline_at = NULL
1241        WHERE callback_id = $1 AND state = 'waiting_external'
1242          AND ($2::bigint IS NULL OR run_lease = $2)
1243        RETURNING *
1244        "#,
1245    )
1246    .bind(callback_id)
1247    .bind(run_lease)
1248    .fetch_optional(executor)
1249    .await?;
1250
1251    row.ok_or(AwaError::CallbackNotFound {
1252        callback_id: callback_id.to_string(),
1253    })
1254}
1255
1256/// Reset the callback timeout for a long-running external operation.
1257///
1258/// External systems call this periodically to signal "still working" without
1259/// completing the job. Resets `callback_timeout_at` to `now() + timeout`.
1260/// The job stays in `waiting_external`.
1261///
1262/// Returns the updated job row, or `CallbackNotFound` if the callback ID
1263/// doesn't match a waiting job.
1264pub async fn heartbeat_callback<'e, E>(
1265    executor: E,
1266    callback_id: Uuid,
1267    timeout: std::time::Duration,
1268) -> Result<JobRow, AwaError>
1269where
1270    E: PgExecutor<'e>,
1271{
1272    let timeout_secs = timeout.as_secs_f64();
1273    let row = sqlx::query_as::<_, JobRow>(
1274        r#"
1275        UPDATE awa.jobs
1276        SET callback_timeout_at = now() + make_interval(secs => $2)
1277        WHERE callback_id = $1 AND state = 'waiting_external'
1278        RETURNING *
1279        "#,
1280    )
1281    .bind(callback_id)
1282    .bind(timeout_secs)
1283    .fetch_optional(executor)
1284    .await?;
1285
1286    row.ok_or(AwaError::CallbackNotFound {
1287        callback_id: callback_id.to_string(),
1288    })
1289}
1290
1291/// Cancel (clear) a registered callback for a running job.
1292///
1293/// Best-effort cleanup: returns `Ok(true)` if a row was updated,
1294/// `Ok(false)` if no match (already resolved, rescued, or wrong lease).
1295/// Callers should not treat `false` as an error.
1296pub async fn cancel_callback<'e, E>(
1297    executor: E,
1298    job_id: i64,
1299    run_lease: i64,
1300) -> Result<bool, AwaError>
1301where
1302    E: PgExecutor<'e>,
1303{
1304    let result = sqlx::query(
1305        r#"
1306        UPDATE awa.jobs
1307        SET callback_id = NULL,
1308            callback_timeout_at = NULL,
1309            callback_filter = NULL,
1310            callback_on_complete = NULL,
1311            callback_on_fail = NULL,
1312            callback_transform = NULL
1313        WHERE id = $1 AND callback_id IS NOT NULL AND state = 'running' AND run_lease = $2
1314        "#,
1315    )
1316    .bind(job_id)
1317    .bind(run_lease)
1318    .execute(executor)
1319    .await?;
1320
1321    Ok(result.rows_affected() > 0)
1322}
1323
1324// ── Sequential callback wait helpers ─────────────────────────────────
1325//
1326// These functions extract the DB-interaction logic for `wait_for_callback`
1327// so that both the Rust `JobContext` and the Python bridge call the same
1328// code paths.
1329
1330/// Result of a single poll iteration inside `wait_for_callback`.
1331#[derive(Debug)]
1332pub enum CallbackPollResult {
1333    /// The callback was resolved and the payload is ready.
1334    Resolved(serde_json::Value),
1335    /// Still waiting — caller should sleep and poll again.
1336    Pending,
1337    /// The callback token is stale (a different callback is current).
1338    Stale {
1339        token: Uuid,
1340        current: Uuid,
1341        state: JobState,
1342    },
1343    /// The job left the wait unexpectedly (rescued, cancelled, etc.).
1344    UnexpectedState { token: Uuid, state: JobState },
1345    /// The job was not found.
1346    NotFound,
1347}
1348
1349/// Transition a running job to `waiting_external` for the given callback.
1350///
1351/// Returns `Ok(true)` if the transition succeeded, `Ok(false)` if the row
1352/// did not match (the caller should check for an early-resume race).
1353pub async fn enter_callback_wait(
1354    pool: &PgPool,
1355    job_id: i64,
1356    run_lease: i64,
1357    callback_id: Uuid,
1358) -> Result<bool, AwaError> {
1359    let result = sqlx::query(
1360        r#"
1361        UPDATE awa.jobs
1362        SET state = 'waiting_external',
1363            heartbeat_at = NULL,
1364            deadline_at = NULL
1365        WHERE id = $1 AND state = 'running' AND run_lease = $2 AND callback_id = $3
1366        "#,
1367    )
1368    .bind(job_id)
1369    .bind(run_lease)
1370    .bind(callback_id)
1371    .execute(pool)
1372    .await?;
1373
1374    Ok(result.rows_affected() > 0)
1375}
1376
1377/// Check the current state of a job during callback wait.
1378///
1379/// Handles the early-resume race: if `resume_external` won the race before
1380/// the handler transitioned to `waiting_external`, the callback result is
1381/// already in metadata and this returns `Resolved`.
1382pub async fn check_callback_state(
1383    pool: &PgPool,
1384    job_id: i64,
1385    callback_id: Uuid,
1386) -> Result<CallbackPollResult, AwaError> {
1387    let row: Option<(JobState, Option<Uuid>, serde_json::Value)> =
1388        sqlx::query_as("SELECT state, callback_id, metadata FROM awa.jobs WHERE id = $1")
1389            .bind(job_id)
1390            .fetch_optional(pool)
1391            .await?;
1392
1393    match row {
1394        Some((JobState::Running, None, metadata))
1395            if metadata.get("_awa_callback_result").is_some() =>
1396        {
1397            let payload = take_callback_payload(pool, job_id, metadata).await?;
1398            Ok(CallbackPollResult::Resolved(payload))
1399        }
1400        Some((state, Some(current_callback_id), _)) if current_callback_id != callback_id => {
1401            Ok(CallbackPollResult::Stale {
1402                token: callback_id,
1403                current: current_callback_id,
1404                state,
1405            })
1406        }
1407        Some((JobState::WaitingExternal, Some(current), _)) if current == callback_id => {
1408            Ok(CallbackPollResult::Pending)
1409        }
1410        Some((state, _, _)) => Ok(CallbackPollResult::UnexpectedState {
1411            token: callback_id,
1412            state,
1413        }),
1414        None => Ok(CallbackPollResult::NotFound),
1415    }
1416}
1417
1418/// Extract the `_awa_callback_result` key from metadata and clean it up.
1419pub async fn take_callback_payload(
1420    pool: &PgPool,
1421    job_id: i64,
1422    metadata: serde_json::Value,
1423) -> Result<serde_json::Value, AwaError> {
1424    let payload = metadata
1425        .get("_awa_callback_result")
1426        .cloned()
1427        .unwrap_or(serde_json::Value::Null);
1428
1429    sqlx::query("UPDATE awa.jobs SET metadata = metadata - '_awa_callback_result' WHERE id = $1")
1430        .bind(job_id)
1431        .execute(pool)
1432        .await?;
1433
1434    Ok(payload)
1435}
1436
1437// ── CEL callback expressions ──────────────────────────────────────────
1438
1439/// Configuration for CEL callback expressions.
1440///
1441/// All fields are optional. When all are `None`, behaviour is identical to
1442/// the original `register_callback` (no expression evaluation).
1443#[derive(Debug, Clone, Default)]
1444pub struct CallbackConfig {
1445    /// Gate: should this payload be processed at all? Returns bool.
1446    pub filter: Option<String>,
1447    /// Does this payload indicate success? Returns bool.
1448    pub on_complete: Option<String>,
1449    /// Does this payload indicate failure? Returns bool. Evaluated before on_complete.
1450    pub on_fail: Option<String>,
1451    /// Reshape payload before returning to caller. Returns any Value.
1452    pub transform: Option<String>,
1453}
1454
1455impl CallbackConfig {
1456    /// Returns true if no expressions are configured.
1457    pub fn is_empty(&self) -> bool {
1458        self.filter.is_none()
1459            && self.on_complete.is_none()
1460            && self.on_fail.is_none()
1461            && self.transform.is_none()
1462    }
1463}
1464
1465/// What `resolve_callback` should do if no CEL conditions match or no
1466/// expressions are configured.
1467#[derive(Debug, Clone, Copy, PartialEq, Eq)]
1468pub enum DefaultAction {
1469    Complete,
1470    Fail,
1471    Ignore,
1472}
1473
1474/// Outcome of `resolve_callback`.
1475#[derive(Debug)]
1476pub enum ResolveOutcome {
1477    Completed {
1478        payload: Option<serde_json::Value>,
1479        job: JobRow,
1480    },
1481    Failed {
1482        job: JobRow,
1483    },
1484    Ignored {
1485        reason: String,
1486    },
1487}
1488
1489impl ResolveOutcome {
1490    pub fn is_completed(&self) -> bool {
1491        matches!(self, ResolveOutcome::Completed { .. })
1492    }
1493    pub fn is_failed(&self) -> bool {
1494        matches!(self, ResolveOutcome::Failed { .. })
1495    }
1496    pub fn is_ignored(&self) -> bool {
1497        matches!(self, ResolveOutcome::Ignored { .. })
1498    }
1499}
1500
1501/// Register a callback with optional CEL expressions.
1502///
1503/// When expressions are provided and the `cel` feature is enabled, each
1504/// expression is trial-compiled at registration time so syntax errors are
1505/// caught early.
1506///
1507/// When the `cel` feature is disabled and any expression is non-None,
1508/// returns `AwaError::Validation`.
1509pub async fn register_callback_with_config<'e, E>(
1510    executor: E,
1511    job_id: i64,
1512    run_lease: i64,
1513    timeout: std::time::Duration,
1514    config: &CallbackConfig,
1515) -> Result<Uuid, AwaError>
1516where
1517    E: PgExecutor<'e>,
1518{
1519    // Validate CEL expressions at registration time: compile + check references
1520    #[cfg(feature = "cel")]
1521    {
1522        for (name, expr) in [
1523            ("filter", &config.filter),
1524            ("on_complete", &config.on_complete),
1525            ("on_fail", &config.on_fail),
1526            ("transform", &config.transform),
1527        ] {
1528            if let Some(src) = expr {
1529                let program = cel::Program::compile(src).map_err(|e| {
1530                    AwaError::Validation(format!("invalid CEL expression for {name}: {e}"))
1531                })?;
1532
1533                // Reject undeclared variables — CEL only reports these at execution
1534                // time, so an expression like `missing == 1` would parse fine but
1535                // silently fall into the fail-open path at resolve time.
1536                let refs = program.references();
1537                let bad_vars: Vec<&str> = refs
1538                    .variables()
1539                    .into_iter()
1540                    .filter(|v| *v != "payload")
1541                    .collect();
1542                if !bad_vars.is_empty() {
1543                    return Err(AwaError::Validation(format!(
1544                        "CEL expression for {name} references undeclared variable(s): {}; \
1545                         only 'payload' is available",
1546                        bad_vars.join(", ")
1547                    )));
1548                }
1549            }
1550        }
1551    }
1552
1553    #[cfg(not(feature = "cel"))]
1554    {
1555        if !config.is_empty() {
1556            return Err(AwaError::Validation(
1557                "CEL expressions require the 'cel' feature".into(),
1558            ));
1559        }
1560    }
1561
1562    let callback_id = Uuid::new_v4();
1563    let timeout_secs = timeout.as_secs_f64();
1564
1565    let result = sqlx::query(
1566        r#"UPDATE awa.jobs
1567           SET callback_id = $2,
1568               callback_timeout_at = now() + make_interval(secs => $3),
1569               callback_filter = $4,
1570               callback_on_complete = $5,
1571               callback_on_fail = $6,
1572               callback_transform = $7
1573           WHERE id = $1 AND state = 'running' AND run_lease = $8"#,
1574    )
1575    .bind(job_id)
1576    .bind(callback_id)
1577    .bind(timeout_secs)
1578    .bind(&config.filter)
1579    .bind(&config.on_complete)
1580    .bind(&config.on_fail)
1581    .bind(&config.transform)
1582    .bind(run_lease)
1583    .execute(executor)
1584    .await?;
1585
1586    if result.rows_affected() == 0 {
1587        return Err(AwaError::Validation("job is not in running state".into()));
1588    }
1589    Ok(callback_id)
1590}
1591
1592/// Internal action decided by CEL evaluation or default.
1593enum ResolveAction {
1594    Complete(Option<serde_json::Value>),
1595    Fail {
1596        error: String,
1597        expression: Option<String>,
1598    },
1599    Ignore(String),
1600}
1601
1602/// Resolve a callback by evaluating CEL expressions against the payload.
1603///
1604/// Uses a transaction with `SELECT ... FOR UPDATE` for atomicity.
1605/// The `default_action` determines behaviour when no CEL conditions match
1606/// or no expressions are configured.
1607pub async fn resolve_callback(
1608    pool: &PgPool,
1609    callback_id: Uuid,
1610    payload: Option<serde_json::Value>,
1611    default_action: DefaultAction,
1612    run_lease: Option<i64>,
1613) -> Result<ResolveOutcome, AwaError> {
1614    let mut tx = pool.begin().await?;
1615
1616    // Query jobs_hot directly (not the awa.jobs UNION ALL view) because
1617    // FOR UPDATE is not reliably supported on UNION views. Waiting_external
1618    // and running jobs are always in jobs_hot (the check constraint on
1619    // scheduled_jobs only allows scheduled/retryable).
1620    //
1621    // Accepts both 'waiting_external' and 'running' to handle the race where
1622    // a fast callback arrives before the executor transitions running ->
1623    // waiting_external (matching complete_external/fail_external behavior).
1624    let job = sqlx::query_as::<_, JobRow>(
1625        "SELECT * FROM awa.jobs_hot WHERE callback_id = $1
1626         AND state IN ('waiting_external', 'running')
1627         AND ($2::bigint IS NULL OR run_lease = $2)
1628         FOR UPDATE",
1629    )
1630    .bind(callback_id)
1631    .bind(run_lease)
1632    .fetch_optional(&mut *tx)
1633    .await?
1634    .ok_or(AwaError::CallbackNotFound {
1635        callback_id: callback_id.to_string(),
1636    })?;
1637
1638    let action = evaluate_or_default(&job, &payload, default_action)?;
1639
1640    match action {
1641        ResolveAction::Complete(transformed_payload) => {
1642            let completed_job = sqlx::query_as::<_, JobRow>(
1643                r#"
1644                UPDATE awa.jobs
1645                SET state = 'completed',
1646                    finalized_at = now(),
1647                    callback_id = NULL,
1648                    callback_timeout_at = NULL,
1649                    callback_filter = NULL,
1650                    callback_on_complete = NULL,
1651                    callback_on_fail = NULL,
1652                    callback_transform = NULL,
1653                    heartbeat_at = NULL,
1654                    deadline_at = NULL,
1655                    progress = NULL
1656                WHERE id = $1
1657                RETURNING *
1658                "#,
1659            )
1660            .bind(job.id)
1661            .fetch_one(&mut *tx)
1662            .await?;
1663
1664            tx.commit().await?;
1665            Ok(ResolveOutcome::Completed {
1666                payload: transformed_payload,
1667                job: completed_job,
1668            })
1669        }
1670        ResolveAction::Fail { error, expression } => {
1671            let mut error_json = serde_json::json!({
1672                "error": error,
1673                "attempt": job.attempt,
1674                "at": chrono::Utc::now().to_rfc3339(),
1675            });
1676            if let Some(expr) = expression {
1677                error_json["expression"] = serde_json::Value::String(expr);
1678            }
1679
1680            let failed_job = sqlx::query_as::<_, JobRow>(
1681                r#"
1682                UPDATE awa.jobs
1683                SET state = 'failed',
1684                    finalized_at = now(),
1685                    callback_id = NULL,
1686                    callback_timeout_at = NULL,
1687                    callback_filter = NULL,
1688                    callback_on_complete = NULL,
1689                    callback_on_fail = NULL,
1690                    callback_transform = NULL,
1691                    heartbeat_at = NULL,
1692                    deadline_at = NULL,
1693                    errors = errors || $2::jsonb
1694                WHERE id = $1
1695                RETURNING *
1696                "#,
1697            )
1698            .bind(job.id)
1699            .bind(error_json)
1700            .fetch_one(&mut *tx)
1701            .await?;
1702
1703            tx.commit().await?;
1704            Ok(ResolveOutcome::Failed { job: failed_job })
1705        }
1706        ResolveAction::Ignore(reason) => {
1707            // No state change — dropping tx releases FOR UPDATE lock
1708            Ok(ResolveOutcome::Ignored { reason })
1709        }
1710    }
1711}
1712
1713/// Evaluate CEL expressions or fall through to default_action.
1714fn evaluate_or_default(
1715    job: &JobRow,
1716    payload: &Option<serde_json::Value>,
1717    default_action: DefaultAction,
1718) -> Result<ResolveAction, AwaError> {
1719    let has_expressions = job.callback_filter.is_some()
1720        || job.callback_on_complete.is_some()
1721        || job.callback_on_fail.is_some()
1722        || job.callback_transform.is_some();
1723
1724    if !has_expressions {
1725        return Ok(apply_default(default_action, payload));
1726    }
1727
1728    #[cfg(feature = "cel")]
1729    {
1730        Ok(evaluate_cel(job, payload, default_action))
1731    }
1732
1733    #[cfg(not(feature = "cel"))]
1734    {
1735        // Expressions are present but CEL feature is not enabled.
1736        // Return an error without mutating the job — it stays in waiting_external.
1737        let _ = (payload, default_action);
1738        Err(AwaError::Validation(
1739            "CEL expressions present but 'cel' feature is not enabled".into(),
1740        ))
1741    }
1742}
1743
1744fn apply_default(
1745    default_action: DefaultAction,
1746    payload: &Option<serde_json::Value>,
1747) -> ResolveAction {
1748    match default_action {
1749        DefaultAction::Complete => ResolveAction::Complete(payload.clone()),
1750        DefaultAction::Fail => ResolveAction::Fail {
1751            error: "callback failed: default action".to_string(),
1752            expression: None,
1753        },
1754        DefaultAction::Ignore => {
1755            ResolveAction::Ignore("no expressions configured, default is ignore".to_string())
1756        }
1757    }
1758}
1759
1760#[cfg(feature = "cel")]
1761fn evaluate_cel(
1762    job: &JobRow,
1763    payload: &Option<serde_json::Value>,
1764    default_action: DefaultAction,
1765) -> ResolveAction {
1766    let payload_value = payload.as_ref().cloned().unwrap_or(serde_json::Value::Null);
1767
1768    // 1. Evaluate filter
1769    if let Some(filter_expr) = &job.callback_filter {
1770        match eval_bool(filter_expr, &payload_value, job.id, "filter") {
1771            Ok(true) => {} // pass through
1772            Ok(false) => {
1773                return ResolveAction::Ignore("filter expression returned false".to_string());
1774            }
1775            Err(_) => {
1776                // Fail-open: treat filter error as true (pass through)
1777            }
1778        }
1779    }
1780
1781    // 2. Evaluate on_fail (before on_complete — fail takes precedence)
1782    if let Some(on_fail_expr) = &job.callback_on_fail {
1783        match eval_bool(on_fail_expr, &payload_value, job.id, "on_fail") {
1784            Ok(true) => {
1785                return ResolveAction::Fail {
1786                    error: "callback failed: on_fail expression matched".to_string(),
1787                    expression: Some(on_fail_expr.clone()),
1788                };
1789            }
1790            Ok(false) => {} // don't fail
1791            Err(_) => {
1792                // Fail-open: treat on_fail error as false (don't fail)
1793            }
1794        }
1795    }
1796
1797    // 3. Evaluate on_complete
1798    if let Some(on_complete_expr) = &job.callback_on_complete {
1799        match eval_bool(on_complete_expr, &payload_value, job.id, "on_complete") {
1800            Ok(true) => {
1801                // Complete with optional transform
1802                let transformed = apply_transform(job, &payload_value);
1803                return ResolveAction::Complete(Some(transformed));
1804            }
1805            Ok(false) => {} // don't complete
1806            Err(_) => {
1807                // Fail-open: treat on_complete error as false (don't complete)
1808            }
1809        }
1810    }
1811
1812    // 4. Neither condition matched → apply default_action
1813    apply_default(default_action, payload)
1814}
1815
1816#[cfg(feature = "cel")]
1817fn eval_bool(
1818    expression: &str,
1819    payload_value: &serde_json::Value,
1820    job_id: i64,
1821    expression_name: &str,
1822) -> Result<bool, ()> {
1823    let program = match cel::Program::compile(expression) {
1824        Ok(p) => p,
1825        Err(e) => {
1826            tracing::warn!(
1827                job_id,
1828                expression_name,
1829                expression,
1830                error = %e,
1831                "CEL compilation error during evaluation"
1832            );
1833            return Err(());
1834        }
1835    };
1836
1837    let mut context = cel::Context::default();
1838    if let Err(e) = context.add_variable("payload", payload_value.clone()) {
1839        tracing::warn!(
1840            job_id,
1841            expression_name,
1842            error = %e,
1843            "Failed to add payload variable to CEL context"
1844        );
1845        return Err(());
1846    }
1847
1848    match program.execute(&context) {
1849        Ok(cel::Value::Bool(b)) => Ok(b),
1850        Ok(other) => {
1851            tracing::warn!(
1852                job_id,
1853                expression_name,
1854                expression,
1855                result_type = ?other.type_of(),
1856                "CEL expression returned non-bool"
1857            );
1858            Err(())
1859        }
1860        Err(e) => {
1861            tracing::warn!(
1862                job_id,
1863                expression_name,
1864                expression,
1865                error = %e,
1866                "CEL execution error"
1867            );
1868            Err(())
1869        }
1870    }
1871}
1872
1873#[cfg(feature = "cel")]
1874fn apply_transform(job: &JobRow, payload_value: &serde_json::Value) -> serde_json::Value {
1875    let transform_expr = match &job.callback_transform {
1876        Some(expr) => expr,
1877        None => return payload_value.clone(),
1878    };
1879
1880    let program = match cel::Program::compile(transform_expr) {
1881        Ok(p) => p,
1882        Err(e) => {
1883            tracing::warn!(
1884                job_id = job.id,
1885                expression = transform_expr,
1886                error = %e,
1887                "CEL transform compilation error, using original payload"
1888            );
1889            return payload_value.clone();
1890        }
1891    };
1892
1893    let mut context = cel::Context::default();
1894    if let Err(e) = context.add_variable("payload", payload_value.clone()) {
1895        tracing::warn!(
1896            job_id = job.id,
1897            error = %e,
1898            "Failed to add payload variable for transform"
1899        );
1900        return payload_value.clone();
1901    }
1902
1903    match program.execute(&context) {
1904        Ok(value) => match value.json() {
1905            Ok(json) => json,
1906            Err(e) => {
1907                tracing::warn!(
1908                    job_id = job.id,
1909                    expression = transform_expr,
1910                    error = %e,
1911                    "CEL transform result could not be converted to JSON, using original payload"
1912                );
1913                payload_value.clone()
1914            }
1915        },
1916        Err(e) => {
1917            tracing::warn!(
1918                job_id = job.id,
1919                expression = transform_expr,
1920                error = %e,
1921                "CEL transform execution error, using original payload"
1922            );
1923            payload_value.clone()
1924        }
1925    }
1926}