use chrono::{DateTime, Utc};
use sqlx::{
query::query, query_as::query_as, query_scalar::query_scalar, transaction::Transaction,
};
use sqlx_postgres::{PgPool, Postgres};
use crate::domain::{Job, JobKind, JobStatus, NewJob};
use crate::error::JobError;
use crate::ids::JobId;
use crate::retry::backoff_for_attempt;
const JOB_COLUMNS: &str = "id, kind::text AS kind, payload, status::text AS status, \
attempts, max_attempts, last_error, run_at, locked_at, locked_by, \
cancel_requested, idempotency_key, created_at, updated_at";
#[derive(Debug, Clone)]
pub struct ListFilter {
pub status: Option<JobStatus>,
pub kind: Option<JobKind>,
pub limit: i64,
pub offset: i64,
}
impl Default for ListFilter {
fn default() -> Self {
Self {
status: None,
kind: None,
limit: 50,
offset: 0,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum CancelOutcome {
CancelledNow,
PendingOnWorker,
AlreadyTerminal(JobStatus),
}
#[derive(Debug, Clone)]
pub enum EnqueueOutcome {
Created(Job),
Existing(Job),
}
impl EnqueueOutcome {
pub fn job(&self) -> &Job {
match self {
EnqueueOutcome::Created(j) | EnqueueOutcome::Existing(j) => j,
}
}
pub fn is_new(&self) -> bool {
matches!(self, EnqueueOutcome::Created(_))
}
}
pub async fn enqueue(pool: &PgPool, new: NewJob) -> Result<EnqueueOutcome, JobError> {
crate::payload::validate(new.kind, &new.payload)?;
let id = JobId::new();
let max_attempts = new.max_attempts.unwrap_or(3);
let insert_sql = format!(
"INSERT INTO jobs (id, kind, payload, max_attempts, idempotency_key) \
VALUES ($1, $2::job_kind, $3, $4, $5) \
ON CONFLICT (idempotency_key) WHERE idempotency_key IS NOT NULL DO NOTHING \
RETURNING {JOB_COLUMNS}"
);
let inserted: Option<Job> = query_as(&insert_sql)
.bind(id.as_uuid())
.bind(new.kind.as_str())
.bind(&new.payload)
.bind(max_attempts)
.bind(new.idempotency_key.as_deref())
.fetch_optional(pool)
.await?;
if let Some(job) = inserted {
return Ok(EnqueueOutcome::Created(job));
}
let key = new
.idempotency_key
.as_deref()
.ok_or_else(|| JobError::Db(sqlx::Error::RowNotFound))?;
let existing: Job = query_as(&format!(
"SELECT {JOB_COLUMNS} FROM jobs WHERE idempotency_key = $1"
))
.bind(key)
.fetch_optional(pool)
.await?
.ok_or_else(|| JobError::IdempotencyConflict(key.to_string()))?;
Ok(EnqueueOutcome::Existing(existing))
}
pub async fn fetch_next(pool: &PgPool, worker_id: &str) -> Result<Option<Job>, JobError> {
let sql = format!(
"UPDATE jobs \
SET status = 'running'::job_status, \
attempts = attempts + 1, \
locked_at = now(), \
locked_by = $1, \
updated_at = now() \
WHERE id = ( \
SELECT id FROM jobs \
WHERE status IN ('queued','retrying') \
AND run_at <= now() \
AND cancel_requested = FALSE \
ORDER BY run_at \
FOR UPDATE SKIP LOCKED \
LIMIT 1 \
) \
RETURNING {JOB_COLUMNS}"
);
let row: Option<Job> = query_as(&sql).bind(worker_id).fetch_optional(pool).await?;
Ok(row)
}
pub async fn mark_succeeded(pool: &PgPool, id: JobId) -> Result<(), JobError> {
let result = query(
"UPDATE jobs \
SET status = 'succeeded'::job_status, \
locked_at = NULL, \
locked_by = NULL, \
updated_at = now() \
WHERE id = $1 AND status = 'running'",
)
.bind(id.as_uuid())
.execute(pool)
.await?;
if result.rows_affected() == 0 {
return Err(JobError::InvalidTransition {
id,
status: "non-running".into(),
action: "succeed",
});
}
Ok(())
}
pub async fn mark_failed_or_retry(
pool: &PgPool,
id: JobId,
error_msg: &str,
) -> Result<Job, JobError> {
let mut tx: Transaction<'_, Postgres> = pool.begin().await?;
let (attempts, max_attempts): (i32, i32) = query_as(
"SELECT attempts, max_attempts FROM jobs WHERE id = $1 AND status = 'running' FOR UPDATE",
)
.bind(id.as_uuid())
.fetch_optional(&mut *tx)
.await?
.ok_or(JobError::InvalidTransition {
id,
status: "non-running".into(),
action: "fail",
})?;
let exhausted = attempts >= max_attempts;
let next_run_at: DateTime<Utc> = if exhausted {
Utc::now()
} else {
let backoff = backoff_for_attempt(attempts);
Utc::now() + chrono::Duration::from_std(backoff).unwrap_or(chrono::Duration::seconds(60))
};
let sql = format!(
"UPDATE jobs \
SET status = CASE WHEN $4 THEN 'failed_permanent'::job_status \
ELSE 'retrying'::job_status END, \
last_error = $2, \
run_at = CASE WHEN $4 THEN run_at ELSE $3 END, \
locked_at = NULL, \
locked_by = NULL, \
updated_at = now() \
WHERE id = $1 \
RETURNING {JOB_COLUMNS}"
);
let job: Job = query_as(&sql)
.bind(id.as_uuid())
.bind(error_msg)
.bind(next_run_at)
.bind(exhausted)
.fetch_one(&mut *tx)
.await?;
tx.commit().await?;
Ok(job)
}
pub async fn request_cancel(pool: &PgPool, id: JobId) -> Result<CancelOutcome, JobError> {
let mut tx: Transaction<'_, Postgres> = pool.begin().await?;
let current: Option<String> =
query_scalar("SELECT status::text FROM jobs WHERE id = $1 FOR UPDATE")
.bind(id.as_uuid())
.fetch_optional(&mut *tx)
.await?;
let status = current
.as_deref()
.map(str::parse)
.transpose()
.map_err(|e| {
JobError::Db(sqlx::Error::ColumnDecode {
index: "status".into(),
source: Box::new(e),
})
})?
.ok_or(JobError::NotFound(id))?;
let outcome = match status {
JobStatus::Queued | JobStatus::Retrying => {
query(
"UPDATE jobs \
SET status = 'cancelled'::job_status, \
cancel_requested = TRUE, \
updated_at = now() \
WHERE id = $1",
)
.bind(id.as_uuid())
.execute(&mut *tx)
.await?;
CancelOutcome::CancelledNow
}
JobStatus::Running => {
query(
"UPDATE jobs \
SET cancel_requested = TRUE, \
updated_at = now() \
WHERE id = $1",
)
.bind(id.as_uuid())
.execute(&mut *tx)
.await?;
CancelOutcome::PendingOnWorker
}
terminal @ (JobStatus::Succeeded | JobStatus::FailedPermanent | JobStatus::Cancelled) => {
CancelOutcome::AlreadyTerminal(terminal)
}
};
tx.commit().await?;
Ok(outcome)
}
pub async fn finalize_cancelled(pool: &PgPool, id: JobId) -> Result<(), JobError> {
query(
"UPDATE jobs \
SET status = 'cancelled'::job_status, \
locked_at = NULL, \
locked_by = NULL, \
updated_at = now() \
WHERE id = $1 AND status = 'running'",
)
.bind(id.as_uuid())
.execute(pool)
.await?;
Ok(())
}
pub async fn get(pool: &PgPool, id: JobId) -> Result<Option<Job>, JobError> {
let row = query_as(&format!("SELECT {JOB_COLUMNS} FROM jobs WHERE id = $1"))
.bind(id.as_uuid())
.fetch_optional(pool)
.await?;
Ok(row)
}
pub async fn list(pool: &PgPool, filter: ListFilter) -> Result<Vec<Job>, JobError> {
let limit = filter.limit.clamp(1, 200);
let offset = filter.offset.max(0);
let rows = query_as(&format!(
"SELECT {JOB_COLUMNS} \
FROM jobs \
WHERE ($1::text IS NULL OR status::text = $1) \
AND ($2::text IS NULL OR kind::text = $2) \
ORDER BY created_at DESC \
LIMIT $3 OFFSET $4"
))
.bind(filter.status.map(JobStatus::as_str))
.bind(filter.kind.map(JobKind::as_str))
.bind(limit)
.bind(offset)
.fetch_all(pool)
.await?;
Ok(rows)
}
pub async fn recover_stale(pool: &PgPool, stale_seconds: i64) -> Result<u64, JobError> {
let result = query(
"UPDATE jobs \
SET status = 'retrying'::job_status, \
last_error = COALESCE(last_error, 'recovered: worker crashed mid-execution'), \
locked_at = NULL, \
locked_by = NULL, \
updated_at = now() \
WHERE status = 'running' \
AND locked_at < now() - ($1 || ' seconds')::interval",
)
.bind(stale_seconds.to_string())
.execute(pool)
.await?;
Ok(result.rows_affected())
}