graphile_worker_admin_ui 0.1.2

Embedded Leptos admin UI for graphile_worker
Documentation
use std::collections::{BTreeSet, HashMap};

use indoc::formatdoc;
use sqlx::{PgPool, Postgres, QueryBuilder};

use super::error::{ApiError, Result};
use super::types::{JobState, JobStats, ListJobsParams, ListedJob, LockedWorkerRow, QueueRow};

pub(crate) async fn list_jobs(
    pool: &PgPool,
    escaped_schema: &str,
    args: &ListJobsParams,
) -> Result<Vec<ListedJob>, ApiError> {
    if args.limit < 0 {
        return Err(ApiError::bad_request(
            "limit must be greater than or equal to 0",
        ));
    }
    if args.offset < 0 {
        return Err(ApiError::bad_request(
            "offset must be greater than or equal to 0",
        ));
    }

    let limit = args.limit.min(500);
    let mut query = QueryBuilder::<Postgres>::new(formatdoc!(
        r#"
            select
                jobs.id,
                tasks.identifier as task_identifier,
                job_queues.queue_name,
                jobs.payload,
                jobs.priority,
                jobs.run_at,
                jobs.attempts,
                jobs.max_attempts,
                jobs.last_error,
                jobs.created_at,
                jobs.updated_at,
                jobs.key,
                jobs.locked_at,
                jobs.locked_by,
                jobs.revision,
                jobs.flags,
                jobs.is_available
            from {escaped_schema}._private_jobs as jobs
            inner join {escaped_schema}._private_tasks as tasks on tasks.id = jobs.task_id
            left join {escaped_schema}._private_job_queues as job_queues on job_queues.id = jobs.job_queue_id
            where true
        "#
    ));

    apply_job_filters(&mut query, args);
    query.push(" order by jobs.id asc limit ");
    query.push_bind(limit);
    query.push(" offset ");
    query.push_bind(args.offset);

    query
        .build_query_as()
        .fetch_all(pool)
        .await
        .map_err(Into::into)
}

pub(crate) async fn get_job(
    pool: &PgPool,
    escaped_schema: &str,
    id: i64,
) -> Result<ListedJob, ApiError> {
    let mut query = QueryBuilder::<Postgres>::new(formatdoc!(
        r#"
            select
                jobs.id,
                tasks.identifier as task_identifier,
                job_queues.queue_name,
                jobs.payload,
                jobs.priority,
                jobs.run_at,
                jobs.attempts,
                jobs.max_attempts,
                jobs.last_error,
                jobs.created_at,
                jobs.updated_at,
                jobs.key,
                jobs.locked_at,
                jobs.locked_by,
                jobs.revision,
                jobs.flags,
                jobs.is_available
            from {escaped_schema}._private_jobs as jobs
            inner join {escaped_schema}._private_tasks as tasks on tasks.id = jobs.task_id
            left join {escaped_schema}._private_job_queues as job_queues on job_queues.id = jobs.job_queue_id
            where jobs.id =
        "#
    ));
    query.push_bind(id);

    query
        .build_query_as()
        .fetch_one(pool)
        .await
        .map_err(|error| job_lookup_error(id, error))
}

pub(crate) fn job_lookup_error(id: i64, error: sqlx::Error) -> ApiError {
    match error {
        sqlx::Error::RowNotFound => ApiError::not_found(format!("job {id} not found")),
        error => error.into(),
    }
}

pub(crate) async fn task_identifiers_by_id(
    pool: &PgPool,
    escaped_schema: &str,
    task_ids: impl IntoIterator<Item = i32>,
) -> Result<HashMap<i32, String>, ApiError> {
    let task_ids = task_ids.into_iter().collect::<BTreeSet<_>>();
    if task_ids.is_empty() {
        return Ok(HashMap::new());
    }

    let mut query = QueryBuilder::<Postgres>::new(formatdoc!(
        r#"
            select id, identifier
            from {escaped_schema}._private_tasks
            where id in (
        "#
    ));
    let mut separated = query.separated(", ");
    for task_id in task_ids {
        separated.push_bind(task_id);
    }
    separated.push_unseparated(")");

    Ok(query
        .build_query_as::<(i32, String)>()
        .fetch_all(pool)
        .await?
        .into_iter()
        .collect())
}

pub(crate) fn apply_job_filters<'a>(
    query: &mut QueryBuilder<'a, Postgres>,
    args: &'a ListJobsParams,
) {
    if let Some(identifier) = args.identifier.as_ref().filter(|value| !value.is_empty()) {
        query.push(" and tasks.identifier = ");
        query.push_bind(identifier);
    }

    if let Some(queue) = args.queue.as_ref().filter(|value| !value.is_empty()) {
        query.push(" and job_queues.queue_name = ");
        query.push_bind(queue);
    }

    if let Some(search) = args
        .search
        .as_deref()
        .map(str::trim)
        .filter(|value| !value.is_empty())
    {
        let pattern = format!("%{search}%");
        query.push(" and (jobs.id::text ilike ");
        query.push_bind(pattern.clone());
        query.push(" or tasks.identifier ilike ");
        query.push_bind(pattern.clone());
        query.push(" or coalesce(job_queues.queue_name, '') ilike ");
        query.push_bind(pattern.clone());
        query.push(" or coalesce(jobs.key, '') ilike ");
        query.push_bind(pattern.clone());
        query.push(" or coalesce(jobs.locked_by, '') ilike ");
        query.push_bind(pattern.clone());
        query.push(" or coalesce(jobs.last_error, '') ilike ");
        query.push_bind(pattern.clone());
        query.push(" or jobs.payload::text ilike ");
        query.push_bind(pattern);
        query.push(")");
    }

    match args.state {
        JobState::All => {}
        JobState::Ready => {
            query.push(
                " and jobs.locked_at is null and jobs.attempts < jobs.max_attempts and jobs.run_at <= now()",
            );
        }
        JobState::Scheduled => {
            query.push(
                " and jobs.locked_at is null and jobs.attempts < jobs.max_attempts and jobs.run_at > now()",
            );
        }
        JobState::Locked => {
            query.push(" and jobs.locked_at is not null");
        }
        JobState::Failed => {
            query.push(" and jobs.locked_at is null and jobs.attempts >= jobs.max_attempts");
        }
    }
}

pub(crate) async fn get_stats(pool: &PgPool, escaped_schema: &str) -> Result<JobStats, ApiError> {
    let sql = formatdoc!(
        r#"
            select
                count(*)::bigint as total,
                count(*) filter (
                    where locked_at is null
                    and attempts < max_attempts
                    and run_at <= now()
                )::bigint as ready,
                count(*) filter (
                    where locked_at is null
                    and attempts < max_attempts
                    and run_at > now()
                )::bigint as scheduled,
                count(*) filter (where locked_at is not null)::bigint as locked,
                count(*) filter (
                    where locked_at is null
                    and attempts >= max_attempts
                )::bigint as failed
            from {escaped_schema}._private_jobs
        "#
    );

    sqlx::query_as(&sql)
        .fetch_one(pool)
        .await
        .map_err(Into::into)
}

pub(crate) async fn list_queues(
    pool: &PgPool,
    escaped_schema: &str,
) -> Result<Vec<QueueRow>, ApiError> {
    let sql = formatdoc!(
        r#"
            select
                job_queues.id,
                job_queues.queue_name,
                job_queues.locked_at,
                job_queues.locked_by,
                count(jobs.*)::bigint as job_count,
                count(jobs.*) filter (
                    where jobs.locked_at is null
                    and jobs.attempts < jobs.max_attempts
                    and jobs.run_at <= now()
                )::bigint as ready_count
            from {escaped_schema}._private_job_queues as job_queues
            left join {escaped_schema}._private_jobs as jobs on jobs.job_queue_id = job_queues.id
            group by job_queues.id, job_queues.queue_name, job_queues.locked_at, job_queues.locked_by
            order by job_queues.queue_name asc
        "#
    );

    sqlx::query_as(&sql)
        .fetch_all(pool)
        .await
        .map_err(Into::into)
}

pub(crate) async fn list_locked_workers(
    pool: &PgPool,
    escaped_schema: &str,
) -> Result<Vec<LockedWorkerRow>, ApiError> {
    let sql = formatdoc!(
        r#"
            select
                worker_id,
                sum(locked_jobs)::bigint as locked_jobs,
                sum(locked_queues)::bigint as locked_queues
            from (
                select locked_by as worker_id, count(*)::bigint as locked_jobs, 0::bigint as locked_queues
                from {escaped_schema}._private_jobs
                where locked_by is not null
                group by locked_by
                union all
                select locked_by as worker_id, 0::bigint as locked_jobs, count(*)::bigint as locked_queues
                from {escaped_schema}._private_job_queues
                where locked_by is not null
                group by locked_by
            ) as locks
            group by worker_id
            order by worker_id asc
        "#
    );

    sqlx::query_as(&sql)
        .fetch_all(pool)
        .await
        .map_err(Into::into)
}