athena_rs 3.22.1

Hyper performant polyglot Database driver
Documentation
use actix_web::{
    HttpRequest, HttpResponse, get, post,
    web::{self, Bytes, Data, Path},
};
use athena_control_plane::{CloneJobId, CloneJobRequest, CloneJobService, ControlPlaneError};
use serde_json::json;
use sqlx::postgres::PgPool;
use uuid::Uuid;

use crate::AppState;
use crate::api::auth::authorize_static_admin_key;
use crate::api::response::{
    api_created, api_success, bad_request, conflict, internal_error, not_found, service_unavailable,
};

fn client_catalog_pool(state: &AppState) -> Result<PgPool, HttpResponse> {
    let Some(client_name) = state.logging_client_name.as_ref() else {
        return Err(service_unavailable(
            "Client catalog unavailable",
            "No athena_logging client is configured.",
        ));
    };

    state.pg_registry.get_pool(client_name).ok_or_else(|| {
        service_unavailable(
            "Client catalog unavailable",
            format!("Logging client '{}' is not connected.", client_name),
        )
    })
}

async fn clone_job_service(state: &AppState) -> Result<CloneJobService, HttpResponse> {
    let pool: PgPool = client_catalog_pool(state)?;
    CloneJobService::new(pool)
        .await
        .map_err(map_control_plane_error)
}

fn map_control_plane_error(err: ControlPlaneError) -> HttpResponse {
    match err {
        ControlPlaneError::InvalidInput(message) => {
            bad_request("Invalid clone job request", message)
        }
        ControlPlaneError::NotFound(message) => not_found("Clone job not found", message),
        ControlPlaneError::Conflict(message) => conflict("Clone job conflict", message),
        ControlPlaneError::Storage(message) => internal_error("Clone job storage failed", message),
    }
}

#[post("/admin/provision/clones")]
pub async fn admin_create_clone_job(
    req: HttpRequest,
    body: Bytes,
    state: Data<AppState>,
) -> HttpResponse {
    if let Err(resp) = authorize_static_admin_key(&req) {
        return resp;
    }

    let payload: CloneJobRequest = match serde_json::from_slice(&body) {
        Ok(payload) => payload,
        Err(err) => return bad_request("Invalid clone job payload", err.to_string()),
    };

    let service: CloneJobService = match clone_job_service(state.get_ref()).await {
        Ok(service) => service,
        Err(resp) => return resp,
    };

    match service.create_job(payload).await {
        Ok(job) => api_created("Created clone job", job.detail()),
        Err(err) => map_control_plane_error(err),
    }
}

#[get("/admin/provision/clones")]
pub async fn admin_list_clone_jobs(req: HttpRequest, state: Data<AppState>) -> HttpResponse {
    if let Err(resp) = authorize_static_admin_key(&req) {
        return resp;
    }

    let service: CloneJobService = match clone_job_service(state.get_ref()).await {
        Ok(service) => service,
        Err(resp) => return resp,
    };

    match service.list_jobs().await {
        Ok(jobs) => api_success("Loaded clone jobs", json!({ "jobs": jobs })),
        Err(err) => map_control_plane_error(err),
    }
}

#[get("/admin/provision/clones/{job_id}")]
pub async fn admin_get_clone_job(
    req: HttpRequest,
    path: Path<Uuid>,
    state: Data<AppState>,
) -> HttpResponse {
    if let Err(resp) = authorize_static_admin_key(&req) {
        return resp;
    }

    let service: CloneJobService = match clone_job_service(state.get_ref()).await {
        Ok(service) => service,
        Err(resp) => return resp,
    };
    let job_id = CloneJobId(path.into_inner());

    let job = match service.get_job(job_id).await {
        Ok(Some(job)) => job,
        Ok(None) => {
            return not_found(
                "Clone job not found",
                "No clone job exists for the requested id",
            );
        }
        Err(err) => return map_control_plane_error(err),
    };
    let events = match service.list_job_events(job_id).await {
        Ok(events) => events,
        Err(err) => return map_control_plane_error(err),
    };

    api_success("Loaded clone job", json!({ "job": job, "events": events }))
}

#[post("/admin/provision/clones/{job_id}/cancel")]
pub async fn admin_cancel_clone_job(
    req: HttpRequest,
    path: Path<Uuid>,
    state: Data<AppState>,
) -> HttpResponse {
    if let Err(resp) = authorize_static_admin_key(&req) {
        return resp;
    }

    let service: CloneJobService = match clone_job_service(state.get_ref()).await {
        Ok(service) => service,
        Err(resp) => return resp,
    };

    match service.cancel_job(CloneJobId(path.into_inner())).await {
        Ok(job) => api_success("Requested clone job cancellation", job.detail()),
        Err(err) => map_control_plane_error(err),
    }
}

#[post("/admin/provision/clones/{job_id}/retry")]
pub async fn admin_retry_clone_job(
    req: HttpRequest,
    path: Path<Uuid>,
    state: Data<AppState>,
) -> HttpResponse {
    if let Err(resp) = authorize_static_admin_key(&req) {
        return resp;
    }

    let service: CloneJobService = match clone_job_service(state.get_ref()).await {
        Ok(service) => service,
        Err(resp) => return resp,
    };

    match service.retry_job(CloneJobId(path.into_inner())).await {
        Ok(job) => api_success("Requeued clone job", job.detail()),
        Err(err) => map_control_plane_error(err),
    }
}

pub(super) fn register(cfg: &mut web::ServiceConfig) {
    cfg.service(admin_create_clone_job)
        .service(admin_list_clone_jobs)
        .service(admin_get_clone_job)
        .service(admin_cancel_clone_job)
        .service(admin_retry_clone_job);
}