use crate::db::persistent_queue::JobDbType;
use crate::logic::persistent_queue::job_details::JobDetails;
use crate::logic::persistent_queue::persistent_job::PersistentJob;
use sqlx::types::Json;
use sqlx::{Executor, Postgres, Row};
pub async fn pop_queue(
exec: impl Executor<'_, Database = Postgres>,
job_type: &JobDbType,
) -> Result<Option<PersistentJob>, sqlx::Error> {
let query = sqlx::query(
r#"
delete from job_queue
where
id =
(
select id
from job_queue
where
job_type = $1
and
attempts_remaining > 0
and
no_sooner_than <= now()
order by id
limit 1
for update skip locked
)
returning id, attempts_remaining, details
"#,
)
.bind(job_type);
let result = query.fetch_optional(exec).await?;
match result {
None => Ok(None),
Some(row) => {
let job_id = row.try_get::<i32, usize>(0)?;
let attempts_remaining = row.try_get::<i32, usize>(1)?;
let job_details = row
.try_get::<Json<JobDetails>, usize>(2)
.map(|json| json.0)?;
Ok(Some(PersistentJob {
job_id,
attempts_remaining,
job_details,
}))
}
}
}