graphile_worker_admin_ui 0.2.0

Embedded Leptos admin UI for graphile_worker
Documentation
use std::sync::Arc;

use axum::extract::State;
use axum::Json;
use chrono::Utc;
use graphile_worker::worker_utils::types::RescheduleJobOptions;
use graphile_worker::DbJob;
use graphile_worker_admin_api::queries::jobs::tasks::task_identifiers_by_id;

use super::super::super::error::ApiError;
use super::super::super::state::AppState;
use super::super::super::types::{
    db_job_output_from_db_job, JobAction, JobActionRequest, JobActionResponse, MessageResponse,
    RemoveJobByKeyRequest,
};
use super::super::shared::ensure_write_allowed;

pub(crate) async fn job_action(
    State(state): State<Arc<AppState>>,
    Json(request): Json<JobActionRequest>,
) -> Result<Json<JobActionResponse>, ApiError> {
    ensure_write_allowed(&state)?;
    if request.ids.is_empty() {
        return Err(ApiError::bad_request("select at least one job"));
    }

    match request.action {
        JobAction::Complete => {
            let jobs = state
                .utils
                .complete_jobs(&request.ids)
                .await
                .map_err(ApiError::internal)?;
            action_response(&state, "Completed", &jobs).await
        }
        JobAction::Fail => {
            let reason = request
                .reason
                .as_deref()
                .filter(|reason| !reason.trim().is_empty())
                .unwrap_or("Marked failed from Graphile Worker admin UI");
            let jobs = state
                .utils
                .permanently_fail_jobs(&request.ids, reason)
                .await
                .map_err(ApiError::internal)?;
            action_response(&state, "Failed", &jobs).await
        }
        JobAction::RunNow => {
            let jobs = state
                .utils
                .reschedule_jobs(
                    &request.ids,
                    RescheduleJobOptions {
                        run_at: Some(Utc::now()),
                        priority: request.priority,
                        attempts: request.attempts,
                        max_attempts: request.max_attempts,
                    },
                )
                .await
                .map_err(ApiError::internal)?;
            action_response(&state, "Run now", &jobs).await
        }
        JobAction::Reschedule => reschedule_jobs(state, request).await,
    }
}

pub(crate) async fn remove_job_by_key(
    State(state): State<Arc<AppState>>,
    Json(request): Json<RemoveJobByKeyRequest>,
) -> Result<Json<MessageResponse>, ApiError> {
    ensure_write_allowed(&state)?;
    if request.key.trim().is_empty() {
        return Err(ApiError::bad_request("key is required"));
    }

    state
        .utils
        .remove_job(&request.key)
        .await
        .map_err(ApiError::internal)?;

    Ok(Json(MessageResponse {
        message: format!("Removed job with key `{}`", request.key),
    }))
}

async fn reschedule_jobs(
    state: Arc<AppState>,
    request: JobActionRequest,
) -> Result<Json<JobActionResponse>, ApiError> {
    if request.run_at.is_none()
        && request.priority.is_none()
        && request.attempts.is_none()
        && request.max_attempts.is_none()
    {
        return Err(ApiError::bad_request(
            "provide run_at, priority, attempts, or max_attempts",
        ));
    }

    let jobs = state
        .utils
        .reschedule_jobs(
            &request.ids,
            RescheduleJobOptions {
                run_at: request.run_at,
                priority: request.priority,
                attempts: request.attempts,
                max_attempts: request.max_attempts,
            },
        )
        .await
        .map_err(ApiError::internal)?;
    action_response(&state, "Rescheduled", &jobs).await
}

async fn action_response(
    state: &AppState,
    action: &str,
    jobs: &[DbJob],
) -> Result<Json<JobActionResponse>, ApiError> {
    let identifiers = task_identifiers_by_id(
        &state.pool,
        &state.schema,
        jobs.iter().map(|job| *job.task_id()),
    )
    .await?;

    Ok(Json(JobActionResponse {
        message: format!("{action} {} job(s)", jobs.len()),
        jobs: jobs
            .iter()
            .map(|job| db_job_output_from_db_job(job, identifiers.get(job.task_id()).cloned()))
            .collect(),
    }))
}