use actix_web::{
HttpRequest, HttpResponse, get, post,
web::{self, Bytes, Data, Path},
};
use athena_control_plane::{CloneJobId, CloneJobRequest, CloneJobService, ControlPlaneError};
use serde_json::json;
use sqlx::postgres::PgPool;
use uuid::Uuid;
use crate::AppState;
use crate::api::auth::authorize_static_admin_key;
use crate::api::response::{
api_created, api_success, bad_request, conflict, internal_error, not_found, service_unavailable,
};
fn client_catalog_pool(state: &AppState) -> Result<PgPool, HttpResponse> {
let Some(client_name) = state.logging_client_name.as_ref() else {
return Err(service_unavailable(
"Client catalog unavailable",
"No athena_logging client is configured.",
));
};
state.pg_registry.get_pool(client_name).ok_or_else(|| {
service_unavailable(
"Client catalog unavailable",
format!("Logging client '{}' is not connected.", client_name),
)
})
}
async fn clone_job_service(state: &AppState) -> Result<CloneJobService, HttpResponse> {
let pool: PgPool = client_catalog_pool(state)?;
CloneJobService::new(pool)
.await
.map_err(map_control_plane_error)
}
fn map_control_plane_error(err: ControlPlaneError) -> HttpResponse {
match err {
ControlPlaneError::InvalidInput(message) => {
bad_request("Invalid clone job request", message)
}
ControlPlaneError::NotFound(message) => not_found("Clone job not found", message),
ControlPlaneError::Conflict(message) => conflict("Clone job conflict", message),
ControlPlaneError::Storage(message) => internal_error("Clone job storage failed", message),
}
}
#[post("/admin/provision/clones")]
pub async fn admin_create_clone_job(
req: HttpRequest,
body: Bytes,
state: Data<AppState>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let payload: CloneJobRequest = match serde_json::from_slice(&body) {
Ok(payload) => payload,
Err(err) => return bad_request("Invalid clone job payload", err.to_string()),
};
let service: CloneJobService = match clone_job_service(state.get_ref()).await {
Ok(service) => service,
Err(resp) => return resp,
};
match service.create_job(payload).await {
Ok(job) => api_created("Created clone job", job.detail()),
Err(err) => map_control_plane_error(err),
}
}
#[get("/admin/provision/clones")]
pub async fn admin_list_clone_jobs(req: HttpRequest, state: Data<AppState>) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let service: CloneJobService = match clone_job_service(state.get_ref()).await {
Ok(service) => service,
Err(resp) => return resp,
};
match service.list_jobs().await {
Ok(jobs) => api_success("Loaded clone jobs", json!({ "jobs": jobs })),
Err(err) => map_control_plane_error(err),
}
}
#[get("/admin/provision/clones/{job_id}")]
pub async fn admin_get_clone_job(
req: HttpRequest,
path: Path<Uuid>,
state: Data<AppState>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let service: CloneJobService = match clone_job_service(state.get_ref()).await {
Ok(service) => service,
Err(resp) => return resp,
};
let job_id = CloneJobId(path.into_inner());
let job = match service.get_job(job_id).await {
Ok(Some(job)) => job,
Ok(None) => {
return not_found(
"Clone job not found",
"No clone job exists for the requested id",
);
}
Err(err) => return map_control_plane_error(err),
};
let events = match service.list_job_events(job_id).await {
Ok(events) => events,
Err(err) => return map_control_plane_error(err),
};
api_success("Loaded clone job", json!({ "job": job, "events": events }))
}
#[post("/admin/provision/clones/{job_id}/cancel")]
pub async fn admin_cancel_clone_job(
req: HttpRequest,
path: Path<Uuid>,
state: Data<AppState>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let service: CloneJobService = match clone_job_service(state.get_ref()).await {
Ok(service) => service,
Err(resp) => return resp,
};
match service.cancel_job(CloneJobId(path.into_inner())).await {
Ok(job) => api_success("Requested clone job cancellation", job.detail()),
Err(err) => map_control_plane_error(err),
}
}
#[post("/admin/provision/clones/{job_id}/retry")]
pub async fn admin_retry_clone_job(
req: HttpRequest,
path: Path<Uuid>,
state: Data<AppState>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let service: CloneJobService = match clone_job_service(state.get_ref()).await {
Ok(service) => service,
Err(resp) => return resp,
};
match service.retry_job(CloneJobId(path.into_inner())).await {
Ok(job) => api_success("Requeued clone job", job.detail()),
Err(err) => map_control_plane_error(err),
}
}
pub(super) fn register(cfg: &mut web::ServiceConfig) {
cfg.service(admin_create_clone_job)
.service(admin_list_clone_jobs)
.service(admin_get_clone_job)
.service(admin_cancel_clone_job)
.service(admin_retry_clone_job);
}