graphile_worker_admin_ui 0.2.1

Embedded Leptos admin UI for graphile_worker
Documentation
use std::sync::Arc;
use std::time::Duration;

use axum::extract::State;
use axum::Json;
use graphile_worker::SweepStaleWorkersOptions;

use super::super::error::ApiError;
use super::super::state::AppState;
use super::super::types::{
    cleanup_task_from_name, CleanupTaskName, MaintenanceAction, MaintenanceRequest, MessageResponse,
};
use super::shared::ensure_write_allowed;

pub(crate) async fn maintenance(
    State(state): State<Arc<AppState>>,
    Json(request): Json<MaintenanceRequest>,
) -> Result<Json<MessageResponse>, ApiError> {
    ensure_write_allowed(&state)?;
    match request.action {
        MaintenanceAction::Migrate => {
            state.utils.migrate().await.map_err(ApiError::internal)?;
            Ok(Json(MessageResponse {
                message: "Migrations complete".to_string(),
            }))
        }
        MaintenanceAction::Cleanup => {
            let tasks = if request.cleanup_tasks.is_empty() {
                CleanupTaskName::all()
            } else {
                request.cleanup_tasks
            };
            let cleanup_tasks: Vec<_> = tasks.into_iter().map(cleanup_task_from_name).collect();
            state
                .utils
                .cleanup(&cleanup_tasks)
                .await
                .map_err(ApiError::internal)?;
            Ok(Json(MessageResponse {
                message: "Cleanup complete".to_string(),
            }))
        }
        MaintenanceAction::ForceUnlock => {
            if request.worker_ids.is_empty() {
                return Err(ApiError::bad_request("provide at least one worker id"));
            }
            let worker_ids: Vec<_> = request.worker_ids.iter().map(String::as_str).collect();
            state
                .utils
                .force_unlock_workers(&worker_ids)
                .await
                .map_err(ApiError::internal)?;
            Ok(Json(MessageResponse {
                message: format!("Unlocked {} worker id(s)", worker_ids.len()),
            }))
        }
        MaintenanceAction::SweepStaleWorkers => {
            let result = state
                .utils
                .sweep_stale_workers(SweepStaleWorkersOptions {
                    sweep_threshold: request.sweep_threshold_secs.map(Duration::from_secs),
                    recovery_delay: request.recovery_delay_secs.map(Duration::from_secs),
                    dry_run: request.dry_run,
                })
                .await
                .map_err(ApiError::internal)?;

            let message = if request.dry_run {
                if result.worker_ids.is_empty() {
                    "No stale workers found".to_string()
                } else {
                    format!(
                        "Would recover {} worker(s): {}",
                        result.worker_ids.len(),
                        result.worker_ids.join(", ")
                    )
                }
            } else if result.worker_ids.is_empty() {
                "No stale workers found".to_string()
            } else {
                format!(
                    "Recovered {} job(s) from {} worker(s): {}",
                    result.recovered_count,
                    result.worker_ids.len(),
                    result.worker_ids.join(", ")
                )
            };

            Ok(Json(MessageResponse { message }))
        }
    }
}