use crate::error::AwaError;
use crate::job::{JobRow, JobState};
use chrono::{DateTime, Duration, Utc};
use serde::{Deserialize, Serialize};
use sqlx::types::Json;
use sqlx::PgExecutor;
use sqlx::PgPool;
use std::cmp::max;
use std::collections::HashMap;
use uuid::Uuid;
pub async fn retry<'e, E>(executor: E, job_id: i64) -> Result<Option<JobRow>, AwaError>
where
E: PgExecutor<'e>,
{
sqlx::query_as::<_, JobRow>(
r#"
UPDATE awa.jobs
SET state = 'available', attempt = 0, run_at = now(),
finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL,
callback_id = NULL, callback_timeout_at = NULL,
callback_filter = NULL, callback_on_complete = NULL,
callback_on_fail = NULL, callback_transform = NULL
WHERE id = $1 AND state IN ('failed', 'cancelled', 'waiting_external')
RETURNING *
"#,
)
.bind(job_id)
.fetch_optional(executor)
.await?
.ok_or(AwaError::JobNotFound { id: job_id })
.map(Some)
}
pub async fn cancel<'e, E>(executor: E, job_id: i64) -> Result<Option<JobRow>, AwaError>
where
E: PgExecutor<'e>,
{
sqlx::query_as::<_, JobRow>(
r#"
UPDATE awa.jobs
SET state = 'cancelled', finalized_at = now(),
callback_id = NULL, callback_timeout_at = NULL,
callback_filter = NULL, callback_on_complete = NULL,
callback_on_fail = NULL, callback_transform = NULL
WHERE id = $1 AND state NOT IN ('completed', 'failed', 'cancelled')
RETURNING *
"#,
)
.bind(job_id)
.fetch_optional(executor)
.await?
.ok_or(AwaError::JobNotFound { id: job_id })
.map(Some)
}
pub async fn cancel_by_unique_key<'e, E>(
executor: E,
kind: &str,
queue: Option<&str>,
args: Option<&serde_json::Value>,
period_bucket: Option<i64>,
) -> Result<Option<JobRow>, AwaError>
where
E: PgExecutor<'e>,
{
let unique_key = crate::unique::compute_unique_key(kind, queue, args, period_bucket);
let row = sqlx::query_as::<_, JobRow>(
r#"
WITH candidates AS (
SELECT id FROM awa.jobs_hot
WHERE unique_key = $1 AND state NOT IN ('completed', 'failed', 'cancelled')
UNION ALL
SELECT id FROM awa.scheduled_jobs
WHERE unique_key = $1 AND state NOT IN ('completed', 'failed', 'cancelled')
ORDER BY id ASC
LIMIT 1
)
UPDATE awa.jobs
SET state = 'cancelled', finalized_at = now(),
callback_id = NULL, callback_timeout_at = NULL,
callback_filter = NULL, callback_on_complete = NULL,
callback_on_fail = NULL, callback_transform = NULL
FROM candidates
WHERE awa.jobs.id = candidates.id
RETURNING awa.jobs.*
"#,
)
.bind(&unique_key)
.fetch_optional(executor)
.await?;
Ok(row)
}
pub async fn retry_failed_by_kind<'e, E>(executor: E, kind: &str) -> Result<Vec<JobRow>, AwaError>
where
E: PgExecutor<'e>,
{
let rows = sqlx::query_as::<_, JobRow>(
r#"
UPDATE awa.jobs
SET state = 'available', attempt = 0, run_at = now(),
finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL
WHERE kind = $1 AND state = 'failed'
RETURNING *
"#,
)
.bind(kind)
.fetch_all(executor)
.await?;
Ok(rows)
}
pub async fn retry_failed_by_queue<'e, E>(executor: E, queue: &str) -> Result<Vec<JobRow>, AwaError>
where
E: PgExecutor<'e>,
{
let rows = sqlx::query_as::<_, JobRow>(
r#"
UPDATE awa.jobs
SET state = 'available', attempt = 0, run_at = now(),
finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL
WHERE queue = $1 AND state = 'failed'
RETURNING *
"#,
)
.bind(queue)
.fetch_all(executor)
.await?;
Ok(rows)
}
pub async fn discard_failed<'e, E>(executor: E, kind: &str) -> Result<u64, AwaError>
where
E: PgExecutor<'e>,
{
let result = sqlx::query("DELETE FROM awa.jobs WHERE kind = $1 AND state = 'failed'")
.bind(kind)
.execute(executor)
.await?;
Ok(result.rows_affected())
}
pub async fn pause_queue<'e, E>(
executor: E,
queue: &str,
paused_by: Option<&str>,
) -> Result<(), AwaError>
where
E: PgExecutor<'e>,
{
sqlx::query(
r#"
INSERT INTO awa.queue_meta (queue, paused, paused_at, paused_by)
VALUES ($1, TRUE, now(), $2)
ON CONFLICT (queue) DO UPDATE SET paused = TRUE, paused_at = now(), paused_by = $2
"#,
)
.bind(queue)
.bind(paused_by)
.execute(executor)
.await?;
Ok(())
}
pub async fn resume_queue<'e, E>(executor: E, queue: &str) -> Result<(), AwaError>
where
E: PgExecutor<'e>,
{
sqlx::query("UPDATE awa.queue_meta SET paused = FALSE WHERE queue = $1")
.bind(queue)
.execute(executor)
.await?;
Ok(())
}
pub async fn drain_queue<'e, E>(executor: E, queue: &str) -> Result<u64, AwaError>
where
E: PgExecutor<'e>,
{
let result = sqlx::query(
r#"
UPDATE awa.jobs
SET state = 'cancelled', finalized_at = now(),
callback_id = NULL, callback_timeout_at = NULL,
callback_filter = NULL, callback_on_complete = NULL,
callback_on_fail = NULL, callback_transform = NULL
WHERE queue = $1 AND state IN ('available', 'scheduled', 'retryable', 'waiting_external')
"#,
)
.bind(queue)
.execute(executor)
.await?;
Ok(result.rows_affected())
}
#[derive(Debug, Clone, Serialize)]
pub struct QueueStats {
pub queue: String,
pub total_queued: i64,
pub scheduled: i64,
pub available: i64,
pub retryable: i64,
pub running: i64,
pub failed: i64,
pub waiting_external: i64,
pub completed_last_hour: i64,
pub lag_seconds: Option<f64>,
pub paused: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct RateLimitSnapshot {
pub max_rate: f64,
pub burst: u32,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Hash)]
#[serde(rename_all = "snake_case")]
pub enum QueueRuntimeMode {
HardReserved,
Weighted,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct QueueRuntimeConfigSnapshot {
pub mode: QueueRuntimeMode,
pub max_workers: Option<u32>,
pub min_workers: Option<u32>,
pub weight: Option<u32>,
pub global_max_workers: Option<u32>,
pub poll_interval_ms: u64,
pub deadline_duration_secs: u64,
pub priority_aging_interval_secs: u64,
pub rate_limit: Option<RateLimitSnapshot>,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct QueueRuntimeSnapshot {
pub queue: String,
pub in_flight: u32,
pub overflow_held: Option<u32>,
pub config: QueueRuntimeConfigSnapshot,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RuntimeSnapshotInput {
pub instance_id: Uuid,
pub hostname: Option<String>,
pub pid: i32,
pub version: String,
pub started_at: DateTime<Utc>,
pub snapshot_interval_ms: i64,
pub healthy: bool,
pub postgres_connected: bool,
pub poll_loop_alive: bool,
pub heartbeat_alive: bool,
pub maintenance_alive: bool,
pub shutting_down: bool,
pub leader: bool,
pub global_max_workers: Option<u32>,
pub queues: Vec<QueueRuntimeSnapshot>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RuntimeInstance {
pub instance_id: Uuid,
pub hostname: Option<String>,
pub pid: i32,
pub version: String,
pub started_at: DateTime<Utc>,
pub last_seen_at: DateTime<Utc>,
pub snapshot_interval_ms: i64,
pub stale: bool,
pub healthy: bool,
pub postgres_connected: bool,
pub poll_loop_alive: bool,
pub heartbeat_alive: bool,
pub maintenance_alive: bool,
pub shutting_down: bool,
pub leader: bool,
pub global_max_workers: Option<u32>,
pub queues: Vec<QueueRuntimeSnapshot>,
}
impl RuntimeInstance {
fn stale_cutoff(interval_ms: i64) -> Duration {
let interval_ms = max(interval_ms, 1_000);
Duration::milliseconds(max(interval_ms.saturating_mul(3), 30_000))
}
fn from_db_row(row: RuntimeInstanceRow, now: DateTime<Utc>) -> Self {
let stale = row.last_seen_at + Self::stale_cutoff(row.snapshot_interval_ms) < now;
Self {
instance_id: row.instance_id,
hostname: row.hostname,
pid: row.pid,
version: row.version,
started_at: row.started_at,
last_seen_at: row.last_seen_at,
snapshot_interval_ms: row.snapshot_interval_ms,
stale,
healthy: row.healthy,
postgres_connected: row.postgres_connected,
poll_loop_alive: row.poll_loop_alive,
heartbeat_alive: row.heartbeat_alive,
maintenance_alive: row.maintenance_alive,
shutting_down: row.shutting_down,
leader: row.leader,
global_max_workers: row.global_max_workers.map(|v| v as u32),
queues: row.queues.0,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RuntimeOverview {
pub total_instances: usize,
pub live_instances: usize,
pub stale_instances: usize,
pub healthy_instances: usize,
pub leader_instances: usize,
pub instances: Vec<RuntimeInstance>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct QueueRuntimeSummary {
pub queue: String,
pub instance_count: usize,
pub live_instances: usize,
pub stale_instances: usize,
pub healthy_instances: usize,
pub total_in_flight: u64,
pub overflow_held_total: Option<u64>,
pub config_mismatch: bool,
pub config: Option<QueueRuntimeConfigSnapshot>,
}
#[derive(Debug, sqlx::FromRow)]
struct RuntimeInstanceRow {
instance_id: Uuid,
hostname: Option<String>,
pid: i32,
version: String,
started_at: DateTime<Utc>,
last_seen_at: DateTime<Utc>,
snapshot_interval_ms: i64,
healthy: bool,
postgres_connected: bool,
poll_loop_alive: bool,
heartbeat_alive: bool,
maintenance_alive: bool,
shutting_down: bool,
leader: bool,
global_max_workers: Option<i32>,
queues: Json<Vec<QueueRuntimeSnapshot>>,
}
pub async fn upsert_runtime_snapshot<'e, E>(
executor: E,
snapshot: &RuntimeSnapshotInput,
) -> Result<(), AwaError>
where
E: PgExecutor<'e>,
{
sqlx::query(
r#"
INSERT INTO awa.runtime_instances (
instance_id,
hostname,
pid,
version,
started_at,
last_seen_at,
snapshot_interval_ms,
healthy,
postgres_connected,
poll_loop_alive,
heartbeat_alive,
maintenance_alive,
shutting_down,
leader,
global_max_workers,
queues
)
VALUES (
$1, $2, $3, $4, $5, now(), $6, $7, $8, $9, $10, $11, $12, $13, $14, $15
)
ON CONFLICT (instance_id) DO UPDATE SET
hostname = EXCLUDED.hostname,
pid = EXCLUDED.pid,
version = EXCLUDED.version,
started_at = EXCLUDED.started_at,
last_seen_at = now(),
snapshot_interval_ms = EXCLUDED.snapshot_interval_ms,
healthy = EXCLUDED.healthy,
postgres_connected = EXCLUDED.postgres_connected,
poll_loop_alive = EXCLUDED.poll_loop_alive,
heartbeat_alive = EXCLUDED.heartbeat_alive,
maintenance_alive = EXCLUDED.maintenance_alive,
shutting_down = EXCLUDED.shutting_down,
leader = EXCLUDED.leader,
global_max_workers = EXCLUDED.global_max_workers,
queues = EXCLUDED.queues
"#,
)
.bind(snapshot.instance_id)
.bind(snapshot.hostname.as_deref())
.bind(snapshot.pid)
.bind(&snapshot.version)
.bind(snapshot.started_at)
.bind(snapshot.snapshot_interval_ms)
.bind(snapshot.healthy)
.bind(snapshot.postgres_connected)
.bind(snapshot.poll_loop_alive)
.bind(snapshot.heartbeat_alive)
.bind(snapshot.maintenance_alive)
.bind(snapshot.shutting_down)
.bind(snapshot.leader)
.bind(snapshot.global_max_workers.map(|v| v as i32))
.bind(Json(&snapshot.queues))
.execute(executor)
.await?;
Ok(())
}
pub async fn cleanup_runtime_snapshots<'e, E>(
executor: E,
max_age: Duration,
) -> Result<u64, AwaError>
where
E: PgExecutor<'e>,
{
let seconds = max(max_age.num_seconds(), 1);
let result = sqlx::query(
"DELETE FROM awa.runtime_instances WHERE last_seen_at < now() - make_interval(secs => $1)",
)
.bind(seconds)
.execute(executor)
.await?;
Ok(result.rows_affected())
}
pub async fn list_runtime_instances<'e, E>(executor: E) -> Result<Vec<RuntimeInstance>, AwaError>
where
E: PgExecutor<'e>,
{
let rows = sqlx::query_as::<_, RuntimeInstanceRow>(
r#"
SELECT
instance_id,
hostname,
pid,
version,
started_at,
last_seen_at,
snapshot_interval_ms,
healthy,
postgres_connected,
poll_loop_alive,
heartbeat_alive,
maintenance_alive,
shutting_down,
leader,
global_max_workers,
queues
FROM awa.runtime_instances
ORDER BY leader DESC, last_seen_at DESC, started_at DESC
"#,
)
.fetch_all(executor)
.await?;
let now = Utc::now();
Ok(rows
.into_iter()
.map(|row| RuntimeInstance::from_db_row(row, now))
.collect())
}
pub async fn runtime_overview<'e, E>(executor: E) -> Result<RuntimeOverview, AwaError>
where
E: PgExecutor<'e>,
{
let instances = list_runtime_instances(executor).await?;
let total_instances = instances.len();
let stale_instances = instances.iter().filter(|i| i.stale).count();
let live_instances = total_instances.saturating_sub(stale_instances);
let healthy_instances = instances.iter().filter(|i| !i.stale && i.healthy).count();
let leader_instances = instances.iter().filter(|i| !i.stale && i.leader).count();
Ok(RuntimeOverview {
total_instances,
live_instances,
stale_instances,
healthy_instances,
leader_instances,
instances,
})
}
pub async fn queue_runtime_summary<'e, E>(executor: E) -> Result<Vec<QueueRuntimeSummary>, AwaError>
where
E: PgExecutor<'e>,
{
let instances = list_runtime_instances(executor).await?;
let mut by_queue: HashMap<String, Vec<(bool, bool, QueueRuntimeSnapshot)>> = HashMap::new();
for instance in instances {
let is_live = !instance.stale;
let is_healthy = is_live && instance.healthy;
for queue in instance.queues {
by_queue
.entry(queue.queue.clone())
.or_default()
.push((is_live, is_healthy, queue));
}
}
let mut summaries: Vec<_> = by_queue
.into_iter()
.map(|(queue, entries)| {
let instance_count = entries.len();
let live_instances = entries.iter().filter(|(live, _, _)| *live).count();
let stale_instances = instance_count.saturating_sub(live_instances);
let healthy_instances = entries.iter().filter(|(_, healthy, _)| *healthy).count();
let total_in_flight = entries
.iter()
.filter(|(live, _, _)| *live)
.map(|(_, _, queue)| u64::from(queue.in_flight))
.sum();
let overflow_total: u64 = entries
.iter()
.filter(|(live, _, _)| *live)
.filter_map(|(_, _, queue)| queue.overflow_held.map(u64::from))
.sum();
let live_configs: Vec<_> = entries
.iter()
.filter(|(live, _, _)| *live)
.map(|(_, _, queue)| queue.config.clone())
.collect();
let config_candidates = if live_configs.is_empty() {
entries
.iter()
.map(|(_, _, queue)| queue.config.clone())
.collect::<Vec<_>>()
} else {
live_configs
};
let config = config_candidates.first().cloned();
let config_mismatch = config_candidates
.iter()
.skip(1)
.any(|candidate| Some(candidate) != config.as_ref());
QueueRuntimeSummary {
queue,
instance_count,
live_instances,
stale_instances,
healthy_instances,
total_in_flight,
overflow_held_total: config
.as_ref()
.filter(|cfg| cfg.mode == QueueRuntimeMode::Weighted)
.map(|_| overflow_total),
config_mismatch,
config,
}
})
.collect();
summaries.sort_by(|a, b| a.queue.cmp(&b.queue));
Ok(summaries)
}
pub async fn queue_stats<'e, E>(executor: E) -> Result<Vec<QueueStats>, AwaError>
where
E: PgExecutor<'e>,
{
let rows = sqlx::query_as::<
_,
(
String,
i64,
i64,
i64,
i64,
i64,
i64,
i64,
i64,
Option<f64>,
bool,
),
>(
r#"
WITH available_lag AS (
SELECT
queue,
EXTRACT(EPOCH FROM (now() - min(run_at)))::float8 AS lag_seconds
FROM awa.jobs_hot
WHERE state = 'available'
GROUP BY queue
),
completed_recent AS (
SELECT
queue,
count(*)::bigint AS completed_last_hour
FROM awa.jobs_hot
WHERE state = 'completed'
AND finalized_at > now() - interval '1 hour'
GROUP BY queue
)
SELECT
qs.queue,
qs.scheduled + qs.available + qs.running + qs.retryable + qs.waiting_external AS total_queued,
qs.scheduled,
qs.available,
qs.retryable,
qs.running,
qs.failed,
qs.waiting_external,
COALESCE(cr.completed_last_hour, 0) AS completed_last_hour,
al.lag_seconds,
COALESCE(qm.paused, FALSE) AS paused
FROM awa.queue_state_counts qs
LEFT JOIN available_lag al ON al.queue = qs.queue
LEFT JOIN completed_recent cr ON cr.queue = qs.queue
LEFT JOIN awa.queue_meta qm ON qm.queue = qs.queue
ORDER BY qs.queue
"#,
)
.fetch_all(executor)
.await?;
Ok(rows
.into_iter()
.map(
|(
queue,
total_queued,
scheduled,
available,
retryable,
running,
failed,
waiting_external,
completed_last_hour,
lag_seconds,
paused,
)| QueueStats {
queue,
total_queued,
scheduled,
available,
retryable,
running,
failed,
waiting_external,
completed_last_hour,
lag_seconds,
paused,
},
)
.collect())
}
#[derive(Debug, Clone, Default, Serialize)]
pub struct ListJobsFilter {
pub state: Option<JobState>,
pub kind: Option<String>,
pub queue: Option<String>,
pub tag: Option<String>,
pub before_id: Option<i64>,
pub limit: Option<i64>,
}
pub async fn list_jobs<'e, E>(executor: E, filter: &ListJobsFilter) -> Result<Vec<JobRow>, AwaError>
where
E: PgExecutor<'e>,
{
let limit = filter.limit.unwrap_or(100);
let rows = sqlx::query_as::<_, JobRow>(
r#"
SELECT * FROM awa.jobs
WHERE ($1::awa.job_state IS NULL OR state = $1)
AND ($2::text IS NULL OR kind = $2)
AND ($3::text IS NULL OR queue = $3)
AND ($4::text IS NULL OR tags @> ARRAY[$4]::text[])
AND ($5::bigint IS NULL OR id < $5)
ORDER BY id DESC
LIMIT $6
"#,
)
.bind(filter.state)
.bind(&filter.kind)
.bind(&filter.queue)
.bind(&filter.tag)
.bind(filter.before_id)
.bind(limit)
.fetch_all(executor)
.await?;
Ok(rows)
}
pub async fn get_job<'e, E>(executor: E, job_id: i64) -> Result<JobRow, AwaError>
where
E: PgExecutor<'e>,
{
let row = sqlx::query_as::<_, JobRow>("SELECT * FROM awa.jobs WHERE id = $1")
.bind(job_id)
.fetch_optional(executor)
.await?;
row.ok_or(AwaError::JobNotFound { id: job_id })
}
pub async fn state_counts<'e, E>(executor: E) -> Result<HashMap<JobState, i64>, AwaError>
where
E: PgExecutor<'e>,
{
let rows = sqlx::query_as::<_, (JobState, i64)>(
r#"
SELECT 'scheduled'::awa.job_state, COALESCE(sum(scheduled), 0)::bigint FROM awa.queue_state_counts
UNION ALL
SELECT 'available'::awa.job_state, COALESCE(sum(available), 0)::bigint FROM awa.queue_state_counts
UNION ALL
SELECT 'running'::awa.job_state, COALESCE(sum(running), 0)::bigint FROM awa.queue_state_counts
UNION ALL
SELECT 'completed'::awa.job_state, COALESCE(sum(completed), 0)::bigint FROM awa.queue_state_counts
UNION ALL
SELECT 'retryable'::awa.job_state, COALESCE(sum(retryable), 0)::bigint FROM awa.queue_state_counts
UNION ALL
SELECT 'failed'::awa.job_state, COALESCE(sum(failed), 0)::bigint FROM awa.queue_state_counts
UNION ALL
SELECT 'cancelled'::awa.job_state, COALESCE(sum(cancelled), 0)::bigint FROM awa.queue_state_counts
UNION ALL
SELECT 'waiting_external'::awa.job_state, COALESCE(sum(waiting_external), 0)::bigint FROM awa.queue_state_counts
"#,
)
.fetch_all(executor)
.await?;
Ok(rows.into_iter().collect())
}
pub async fn distinct_kinds<'e, E>(executor: E) -> Result<Vec<String>, AwaError>
where
E: PgExecutor<'e>,
{
let rows = sqlx::query_scalar::<_, String>(
"SELECT kind FROM awa.job_kind_catalog WHERE ref_count > 0 ORDER BY kind",
)
.fetch_all(executor)
.await?;
Ok(rows)
}
pub async fn distinct_queues<'e, E>(executor: E) -> Result<Vec<String>, AwaError>
where
E: PgExecutor<'e>,
{
let rows = sqlx::query_scalar::<_, String>(
"SELECT queue FROM awa.job_queue_catalog WHERE ref_count > 0 ORDER BY queue",
)
.fetch_all(executor)
.await?;
Ok(rows)
}
pub async fn bulk_retry<'e, E>(executor: E, ids: &[i64]) -> Result<Vec<JobRow>, AwaError>
where
E: PgExecutor<'e>,
{
let rows = sqlx::query_as::<_, JobRow>(
r#"
UPDATE awa.jobs
SET state = 'available', attempt = 0, run_at = now(),
finalized_at = NULL, heartbeat_at = NULL, deadline_at = NULL,
callback_id = NULL, callback_timeout_at = NULL,
callback_filter = NULL, callback_on_complete = NULL,
callback_on_fail = NULL, callback_transform = NULL
WHERE id = ANY($1) AND state IN ('failed', 'cancelled', 'waiting_external')
RETURNING *
"#,
)
.bind(ids)
.fetch_all(executor)
.await?;
Ok(rows)
}
pub async fn bulk_cancel<'e, E>(executor: E, ids: &[i64]) -> Result<Vec<JobRow>, AwaError>
where
E: PgExecutor<'e>,
{
let rows = sqlx::query_as::<_, JobRow>(
r#"
UPDATE awa.jobs
SET state = 'cancelled', finalized_at = now(),
callback_id = NULL, callback_timeout_at = NULL,
callback_filter = NULL, callback_on_complete = NULL,
callback_on_fail = NULL, callback_transform = NULL
WHERE id = ANY($1) AND state NOT IN ('completed', 'failed', 'cancelled')
RETURNING *
"#,
)
.bind(ids)
.fetch_all(executor)
.await?;
Ok(rows)
}
#[derive(Debug, Clone, Serialize)]
pub struct StateTimeseriesBucket {
pub bucket: chrono::DateTime<chrono::Utc>,
pub state: JobState,
pub count: i64,
}
pub async fn state_timeseries<'e, E>(
executor: E,
minutes: i32,
) -> Result<Vec<StateTimeseriesBucket>, AwaError>
where
E: PgExecutor<'e>,
{
let rows = sqlx::query_as::<_, (chrono::DateTime<chrono::Utc>, JobState, i64)>(
r#"
SELECT
date_trunc('minute', created_at) AS bucket,
state,
count(*) AS count
FROM awa.jobs
WHERE created_at >= now() - make_interval(mins => $1)
GROUP BY bucket, state
ORDER BY bucket
"#,
)
.bind(minutes)
.fetch_all(executor)
.await?;
Ok(rows
.into_iter()
.map(|(bucket, state, count)| StateTimeseriesBucket {
bucket,
state,
count,
})
.collect())
}
pub async fn register_callback<'e, E>(
executor: E,
job_id: i64,
run_lease: i64,
timeout: std::time::Duration,
) -> Result<Uuid, AwaError>
where
E: PgExecutor<'e>,
{
let callback_id = Uuid::new_v4();
let timeout_secs = timeout.as_secs_f64();
let result = sqlx::query(
r#"UPDATE awa.jobs
SET callback_id = $2,
callback_timeout_at = now() + make_interval(secs => $3),
callback_filter = NULL,
callback_on_complete = NULL,
callback_on_fail = NULL,
callback_transform = NULL
WHERE id = $1 AND state = 'running' AND run_lease = $4"#,
)
.bind(job_id)
.bind(callback_id)
.bind(timeout_secs)
.bind(run_lease)
.execute(executor)
.await?;
if result.rows_affected() == 0 {
return Err(AwaError::Validation("job is not in running state".into()));
}
Ok(callback_id)
}
pub async fn complete_external<'e, E>(
executor: E,
callback_id: Uuid,
_payload: Option<serde_json::Value>,
) -> Result<JobRow, AwaError>
where
E: PgExecutor<'e>,
{
let row = sqlx::query_as::<_, JobRow>(
r#"
UPDATE awa.jobs
SET state = 'completed',
finalized_at = now(),
callback_id = NULL,
callback_timeout_at = NULL,
callback_filter = NULL,
callback_on_complete = NULL,
callback_on_fail = NULL,
callback_transform = NULL,
heartbeat_at = NULL,
deadline_at = NULL,
progress = NULL
WHERE callback_id = $1 AND state IN ('waiting_external', 'running')
RETURNING *
"#,
)
.bind(callback_id)
.fetch_optional(executor)
.await?;
row.ok_or(AwaError::CallbackNotFound {
callback_id: callback_id.to_string(),
})
}
pub async fn fail_external<'e, E>(
executor: E,
callback_id: Uuid,
error: &str,
) -> Result<JobRow, AwaError>
where
E: PgExecutor<'e>,
{
let row = sqlx::query_as::<_, JobRow>(
r#"
UPDATE awa.jobs
SET state = 'failed',
finalized_at = now(),
callback_id = NULL,
callback_timeout_at = NULL,
callback_filter = NULL,
callback_on_complete = NULL,
callback_on_fail = NULL,
callback_transform = NULL,
heartbeat_at = NULL,
deadline_at = NULL,
errors = errors || jsonb_build_object(
'error', $2::text,
'attempt', attempt,
'at', now()
)::jsonb
WHERE callback_id = $1 AND state IN ('waiting_external', 'running')
RETURNING *
"#,
)
.bind(callback_id)
.bind(error)
.fetch_optional(executor)
.await?;
row.ok_or(AwaError::CallbackNotFound {
callback_id: callback_id.to_string(),
})
}
pub async fn retry_external<'e, E>(executor: E, callback_id: Uuid) -> Result<JobRow, AwaError>
where
E: PgExecutor<'e>,
{
let row = sqlx::query_as::<_, JobRow>(
r#"
UPDATE awa.jobs
SET state = 'available',
attempt = 0,
run_at = now(),
finalized_at = NULL,
callback_id = NULL,
callback_timeout_at = NULL,
callback_filter = NULL,
callback_on_complete = NULL,
callback_on_fail = NULL,
callback_transform = NULL,
heartbeat_at = NULL,
deadline_at = NULL
WHERE callback_id = $1 AND state = 'waiting_external'
RETURNING *
"#,
)
.bind(callback_id)
.fetch_optional(executor)
.await?;
row.ok_or(AwaError::CallbackNotFound {
callback_id: callback_id.to_string(),
})
}
#[derive(Debug, Clone, Default)]
pub struct CallbackConfig {
pub filter: Option<String>,
pub on_complete: Option<String>,
pub on_fail: Option<String>,
pub transform: Option<String>,
}
impl CallbackConfig {
pub fn is_empty(&self) -> bool {
self.filter.is_none()
&& self.on_complete.is_none()
&& self.on_fail.is_none()
&& self.transform.is_none()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DefaultAction {
Complete,
Fail,
Ignore,
}
#[derive(Debug)]
pub enum ResolveOutcome {
Completed {
payload: Option<serde_json::Value>,
job: JobRow,
},
Failed {
job: JobRow,
},
Ignored {
reason: String,
},
}
impl ResolveOutcome {
pub fn is_completed(&self) -> bool {
matches!(self, ResolveOutcome::Completed { .. })
}
pub fn is_failed(&self) -> bool {
matches!(self, ResolveOutcome::Failed { .. })
}
pub fn is_ignored(&self) -> bool {
matches!(self, ResolveOutcome::Ignored { .. })
}
}
pub async fn register_callback_with_config<'e, E>(
executor: E,
job_id: i64,
run_lease: i64,
timeout: std::time::Duration,
config: &CallbackConfig,
) -> Result<Uuid, AwaError>
where
E: PgExecutor<'e>,
{
#[cfg(feature = "cel")]
{
for (name, expr) in [
("filter", &config.filter),
("on_complete", &config.on_complete),
("on_fail", &config.on_fail),
("transform", &config.transform),
] {
if let Some(src) = expr {
let program = cel::Program::compile(src).map_err(|e| {
AwaError::Validation(format!("invalid CEL expression for {name}: {e}"))
})?;
let refs = program.references();
let bad_vars: Vec<&str> = refs
.variables()
.into_iter()
.filter(|v| *v != "payload")
.collect();
if !bad_vars.is_empty() {
return Err(AwaError::Validation(format!(
"CEL expression for {name} references undeclared variable(s): {}; \
only 'payload' is available",
bad_vars.join(", ")
)));
}
}
}
}
#[cfg(not(feature = "cel"))]
{
if !config.is_empty() {
return Err(AwaError::Validation(
"CEL expressions require the 'cel' feature".into(),
));
}
}
let callback_id = Uuid::new_v4();
let timeout_secs = timeout.as_secs_f64();
let result = sqlx::query(
r#"UPDATE awa.jobs
SET callback_id = $2,
callback_timeout_at = now() + make_interval(secs => $3),
callback_filter = $4,
callback_on_complete = $5,
callback_on_fail = $6,
callback_transform = $7
WHERE id = $1 AND state = 'running' AND run_lease = $8"#,
)
.bind(job_id)
.bind(callback_id)
.bind(timeout_secs)
.bind(&config.filter)
.bind(&config.on_complete)
.bind(&config.on_fail)
.bind(&config.transform)
.bind(run_lease)
.execute(executor)
.await?;
if result.rows_affected() == 0 {
return Err(AwaError::Validation("job is not in running state".into()));
}
Ok(callback_id)
}
enum ResolveAction {
Complete(Option<serde_json::Value>),
Fail {
error: String,
expression: Option<String>,
},
Ignore(String),
}
pub async fn resolve_callback(
pool: &PgPool,
callback_id: Uuid,
payload: Option<serde_json::Value>,
default_action: DefaultAction,
) -> Result<ResolveOutcome, AwaError> {
let mut tx = pool.begin().await?;
let job = sqlx::query_as::<_, JobRow>(
"SELECT * FROM awa.jobs_hot WHERE callback_id = $1
AND state = 'waiting_external'
FOR UPDATE",
)
.bind(callback_id)
.fetch_optional(&mut *tx)
.await?
.ok_or(AwaError::CallbackNotFound {
callback_id: callback_id.to_string(),
})?;
let action = evaluate_or_default(&job, &payload, default_action)?;
match action {
ResolveAction::Complete(transformed_payload) => {
let completed_job = sqlx::query_as::<_, JobRow>(
r#"
UPDATE awa.jobs
SET state = 'completed',
finalized_at = now(),
callback_id = NULL,
callback_timeout_at = NULL,
callback_filter = NULL,
callback_on_complete = NULL,
callback_on_fail = NULL,
callback_transform = NULL,
heartbeat_at = NULL,
deadline_at = NULL,
progress = NULL
WHERE id = $1
RETURNING *
"#,
)
.bind(job.id)
.fetch_one(&mut *tx)
.await?;
tx.commit().await?;
Ok(ResolveOutcome::Completed {
payload: transformed_payload,
job: completed_job,
})
}
ResolveAction::Fail { error, expression } => {
let mut error_json = serde_json::json!({
"error": error,
"attempt": job.attempt,
"at": chrono::Utc::now().to_rfc3339(),
});
if let Some(expr) = expression {
error_json["expression"] = serde_json::Value::String(expr);
}
let failed_job = sqlx::query_as::<_, JobRow>(
r#"
UPDATE awa.jobs
SET state = 'failed',
finalized_at = now(),
callback_id = NULL,
callback_timeout_at = NULL,
callback_filter = NULL,
callback_on_complete = NULL,
callback_on_fail = NULL,
callback_transform = NULL,
heartbeat_at = NULL,
deadline_at = NULL,
errors = errors || $2::jsonb
WHERE id = $1
RETURNING *
"#,
)
.bind(job.id)
.bind(error_json)
.fetch_one(&mut *tx)
.await?;
tx.commit().await?;
Ok(ResolveOutcome::Failed { job: failed_job })
}
ResolveAction::Ignore(reason) => {
Ok(ResolveOutcome::Ignored { reason })
}
}
}
fn evaluate_or_default(
job: &JobRow,
payload: &Option<serde_json::Value>,
default_action: DefaultAction,
) -> Result<ResolveAction, AwaError> {
let has_expressions = job.callback_filter.is_some()
|| job.callback_on_complete.is_some()
|| job.callback_on_fail.is_some()
|| job.callback_transform.is_some();
if !has_expressions {
return Ok(apply_default(default_action, payload));
}
#[cfg(feature = "cel")]
{
Ok(evaluate_cel(job, payload, default_action))
}
#[cfg(not(feature = "cel"))]
{
let _ = (payload, default_action);
Err(AwaError::Validation(
"CEL expressions present but 'cel' feature is not enabled".into(),
))
}
}
fn apply_default(
default_action: DefaultAction,
payload: &Option<serde_json::Value>,
) -> ResolveAction {
match default_action {
DefaultAction::Complete => ResolveAction::Complete(payload.clone()),
DefaultAction::Fail => ResolveAction::Fail {
error: "callback failed: default action".to_string(),
expression: None,
},
DefaultAction::Ignore => {
ResolveAction::Ignore("no expressions configured, default is ignore".to_string())
}
}
}
#[cfg(feature = "cel")]
fn evaluate_cel(
job: &JobRow,
payload: &Option<serde_json::Value>,
default_action: DefaultAction,
) -> ResolveAction {
let payload_value = payload.as_ref().cloned().unwrap_or(serde_json::Value::Null);
if let Some(filter_expr) = &job.callback_filter {
match eval_bool(filter_expr, &payload_value, job.id, "filter") {
Ok(true) => {} Ok(false) => {
return ResolveAction::Ignore("filter expression returned false".to_string());
}
Err(_) => {
}
}
}
if let Some(on_fail_expr) = &job.callback_on_fail {
match eval_bool(on_fail_expr, &payload_value, job.id, "on_fail") {
Ok(true) => {
return ResolveAction::Fail {
error: "callback failed: on_fail expression matched".to_string(),
expression: Some(on_fail_expr.clone()),
};
}
Ok(false) => {} Err(_) => {
}
}
}
if let Some(on_complete_expr) = &job.callback_on_complete {
match eval_bool(on_complete_expr, &payload_value, job.id, "on_complete") {
Ok(true) => {
let transformed = apply_transform(job, &payload_value);
return ResolveAction::Complete(Some(transformed));
}
Ok(false) => {} Err(_) => {
}
}
}
apply_default(default_action, payload)
}
#[cfg(feature = "cel")]
fn eval_bool(
expression: &str,
payload_value: &serde_json::Value,
job_id: i64,
expression_name: &str,
) -> Result<bool, ()> {
let program = match cel::Program::compile(expression) {
Ok(p) => p,
Err(e) => {
tracing::warn!(
job_id,
expression_name,
expression,
error = %e,
"CEL compilation error during evaluation"
);
return Err(());
}
};
let mut context = cel::Context::default();
if let Err(e) = context.add_variable("payload", payload_value.clone()) {
tracing::warn!(
job_id,
expression_name,
error = %e,
"Failed to add payload variable to CEL context"
);
return Err(());
}
match program.execute(&context) {
Ok(cel::Value::Bool(b)) => Ok(b),
Ok(other) => {
tracing::warn!(
job_id,
expression_name,
expression,
result_type = ?other.type_of(),
"CEL expression returned non-bool"
);
Err(())
}
Err(e) => {
tracing::warn!(
job_id,
expression_name,
expression,
error = %e,
"CEL execution error"
);
Err(())
}
}
}
#[cfg(feature = "cel")]
fn apply_transform(job: &JobRow, payload_value: &serde_json::Value) -> serde_json::Value {
let transform_expr = match &job.callback_transform {
Some(expr) => expr,
None => return payload_value.clone(),
};
let program = match cel::Program::compile(transform_expr) {
Ok(p) => p,
Err(e) => {
tracing::warn!(
job_id = job.id,
expression = transform_expr,
error = %e,
"CEL transform compilation error, using original payload"
);
return payload_value.clone();
}
};
let mut context = cel::Context::default();
if let Err(e) = context.add_variable("payload", payload_value.clone()) {
tracing::warn!(
job_id = job.id,
error = %e,
"Failed to add payload variable for transform"
);
return payload_value.clone();
}
match program.execute(&context) {
Ok(value) => match value.json() {
Ok(json) => json,
Err(e) => {
tracing::warn!(
job_id = job.id,
expression = transform_expr,
error = %e,
"CEL transform result could not be converted to JSON, using original payload"
);
payload_value.clone()
}
},
Err(e) => {
tracing::warn!(
job_id = job.id,
expression = transform_expr,
error = %e,
"CEL transform execution error, using original payload"
);
payload_value.clone()
}
}
}