dialtone_sqlx 0.1.0

Dialtone SQLx Back-End
Documentation
use crate::db::persistent_queue::JobDbType;
use crate::logic::persistent_queue::job_details::JobDetails;
use crate::logic::persistent_queue::persistent_job::PersistentJob;
use sqlx::types::Json;
use sqlx::{Executor, Postgres, Row};

pub async fn pop_queue(
    exec: impl Executor<'_, Database = Postgres>,
    job_type: &JobDbType,
) -> Result<Option<PersistentJob>, sqlx::Error> {
    let query = sqlx::query(
        r#"
            delete from job_queue
            where
                id =
                    (
                        select id
                        from job_queue
                        where
                            job_type = $1
                            and
                            attempts_remaining > 0
                            and
                            no_sooner_than <= now()
                        order by id
                        limit 1
                        for update skip locked
                    )
            returning id, attempts_remaining, details
        "#,
    )
    .bind(job_type);
    let result = query.fetch_optional(exec).await?;
    match result {
        None => Ok(None),
        Some(row) => {
            let job_id = row.try_get::<i32, usize>(0)?;
            let attempts_remaining = row.try_get::<i32, usize>(1)?;
            let job_details = row
                .try_get::<Json<JobDetails>, usize>(2)
                .map(|json| json.0)?;
            Ok(Some(PersistentJob {
                job_id,
                attempts_remaining,
                job_details,
            }))
        }
    }
}