pub mod setup;
use awa_model::{AwaError, JobArgs, JobRow};
use awa_worker::context::ProgressState;
use awa_worker::{JobContext, JobError, JobResult, Worker};
use sqlx::PgPool;
use std::any::Any;
use std::collections::HashMap;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
pub struct TestClient {
pool: PgPool,
}
impl TestClient {
pub async fn from_pool(pool: PgPool) -> Self {
Self { pool }
}
pub fn pool(&self) -> &PgPool {
&self.pool
}
pub async fn migrate(&self) -> Result<(), AwaError> {
awa_model::migrations::run(&self.pool).await
}
pub async fn clean(&self) -> Result<(), AwaError> {
sqlx::query("DELETE FROM awa.jobs")
.execute(&self.pool)
.await?;
sqlx::query("DELETE FROM awa.queue_meta")
.execute(&self.pool)
.await?;
Ok(())
}
pub async fn insert(&self, args: &impl JobArgs) -> Result<JobRow, AwaError> {
awa_model::insert(&self.pool, args).await
}
pub async fn work_one<W: Worker>(&self, worker: &W) -> Result<WorkResult, AwaError> {
self.work_one_in_queue(worker, None).await
}
pub async fn work_one_in_queue<W: Worker>(
&self,
worker: &W,
queue: Option<&str>,
) -> Result<WorkResult, AwaError> {
let jobs: Vec<JobRow> = sqlx::query_as::<_, JobRow>(
r#"
WITH claimed AS (
SELECT id FROM awa.jobs
WHERE state = 'available' AND kind = $1
AND ($2::text IS NULL OR queue = $2)
ORDER BY run_at ASC, id ASC
LIMIT 1
FOR UPDATE SKIP LOCKED
)
UPDATE awa.jobs
SET state = 'running',
attempt = attempt + 1,
run_lease = run_lease + 1,
attempted_at = now(),
heartbeat_at = now(),
deadline_at = now() + interval '5 minutes'
FROM claimed
WHERE awa.jobs.id = claimed.id
RETURNING awa.jobs.*
"#,
)
.bind(worker.kind())
.bind(queue)
.fetch_all(&self.pool)
.await?;
let job = match jobs.into_iter().next() {
Some(job) => job,
None => return Ok(WorkResult::NoJob),
};
let cancel = Arc::new(AtomicBool::new(false));
let state: Arc<HashMap<std::any::TypeId, Box<dyn Any + Send + Sync>>> =
Arc::new(HashMap::new());
let progress = Arc::new(std::sync::Mutex::new(ProgressState::new(
job.progress.clone(),
)));
let ctx = JobContext::new(
job.clone(),
cancel,
state,
self.pool.clone(),
progress.clone(),
);
let result = worker.perform(&ctx).await;
let progress_snapshot: Option<serde_json::Value> = {
let guard = progress.lock().expect("progress lock poisoned");
guard.clone_latest()
};
match &result {
Ok(JobResult::Completed) => {
sqlx::query(
"UPDATE awa.jobs SET state = 'completed', finalized_at = now(), progress = NULL WHERE id = $1",
)
.bind(job.id)
.execute(&self.pool)
.await?;
Ok(WorkResult::Completed(job))
}
Ok(JobResult::Cancel(reason)) => {
sqlx::query(
"UPDATE awa.jobs SET state = 'cancelled', finalized_at = now(), progress = $2 WHERE id = $1",
)
.bind(job.id)
.bind(&progress_snapshot)
.execute(&self.pool)
.await?;
Ok(WorkResult::Cancelled(job, reason.clone()))
}
Ok(JobResult::RetryAfter(_)) | Err(JobError::Retryable(_)) => {
sqlx::query(
"UPDATE awa.jobs SET state = 'retryable', finalized_at = now(), progress = $2 WHERE id = $1",
)
.bind(job.id)
.bind(&progress_snapshot)
.execute(&self.pool)
.await?;
Ok(WorkResult::Retryable(job))
}
Ok(JobResult::Snooze(_)) => {
sqlx::query(
"UPDATE awa.jobs SET state = 'available', attempt = attempt - 1, progress = $2 WHERE id = $1",
)
.bind(job.id)
.bind(&progress_snapshot)
.execute(&self.pool)
.await?;
Ok(WorkResult::Snoozed(job))
}
Ok(JobResult::WaitForCallback(_)) => {
let has_callback: Option<(Option<uuid::Uuid>,)> =
sqlx::query_as("SELECT callback_id FROM awa.jobs WHERE id = $1")
.bind(job.id)
.fetch_optional(&self.pool)
.await?;
match has_callback {
Some((Some(_),)) => {
sqlx::query(
"UPDATE awa.jobs SET state = 'waiting_external', heartbeat_at = NULL, deadline_at = NULL, progress = $2 WHERE id = $1",
)
.bind(job.id)
.bind(&progress_snapshot)
.execute(&self.pool)
.await?;
let updated = self.get_job(job.id).await?;
Ok(WorkResult::WaitingExternal(updated))
}
_ => {
sqlx::query(
"UPDATE awa.jobs SET state = 'failed', finalized_at = now() WHERE id = $1",
)
.bind(job.id)
.execute(&self.pool)
.await?;
Ok(WorkResult::Failed(
job,
"WaitForCallback returned without calling register_callback"
.to_string(),
))
}
}
}
Err(JobError::Terminal(msg)) => {
sqlx::query(
"UPDATE awa.jobs SET state = 'failed', finalized_at = now(), progress = $2 WHERE id = $1",
)
.bind(job.id)
.bind(&progress_snapshot)
.execute(&self.pool)
.await?;
Ok(WorkResult::Failed(job, msg.clone()))
}
}
}
pub async fn get_job(&self, job_id: i64) -> Result<JobRow, AwaError> {
awa_model::admin::get_job(&self.pool, job_id).await
}
}
#[derive(Debug)]
pub enum WorkResult {
NoJob,
Completed(JobRow),
Retryable(JobRow),
Snoozed(JobRow),
Cancelled(JobRow, String),
Failed(JobRow, String),
WaitingExternal(JobRow),
}
impl WorkResult {
pub fn is_completed(&self) -> bool {
matches!(self, WorkResult::Completed(_))
}
pub fn is_failed(&self) -> bool {
matches!(self, WorkResult::Failed(_, _))
}
pub fn is_no_job(&self) -> bool {
matches!(self, WorkResult::NoJob)
}
pub fn is_waiting_external(&self) -> bool {
matches!(self, WorkResult::WaitingExternal(_))
}
}