use actix_web::{HttpRequest, HttpResponse, web, web::Data, web::Json, web::Path};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use uuid::Uuid;
use crate::AppState;
use crate::api::auth::authorize_static_admin_key;
use crate::api::client_context::required_client_pool;
use crate::api::response::{bad_request, internal_error, service_unavailable};
#[derive(Debug, Serialize)]
struct S3CatalogListResponse {
data: Vec<athena_s3::S3CatalogItem>,
}
#[derive(Debug, Serialize)]
struct S3CredentialsListResponse {
data: Vec<athena_s3::S3CredentialListItem>,
}
#[derive(Debug, Serialize)]
struct DeleteS3CatalogResponse {
id: String,
deleted: bool,
}
#[derive(Debug, Deserialize)]
struct CreateS3CatalogRequest {
name: String,
#[serde(default)]
description: Option<String>,
endpoint: String,
region: String,
bucket: String,
#[serde(default)]
provider: Option<String>,
#[serde(default)]
force_path_style: Option<bool>,
#[serde(default)]
default_prefix: Option<String>,
#[serde(default)]
public_base_url: Option<String>,
access_key_id: String,
secret_key: String,
#[serde(default)]
session_token: Option<String>,
#[serde(default)]
metadata: Option<Value>,
}
#[derive(Debug, Deserialize)]
struct UpdateS3CatalogRequest {
#[serde(default)]
name: Option<String>,
#[serde(default)]
description: Option<String>,
#[serde(default)]
endpoint: Option<String>,
#[serde(default)]
region: Option<String>,
#[serde(default)]
bucket: Option<String>,
#[serde(default)]
provider: Option<String>,
#[serde(default)]
force_path_style: Option<bool>,
#[serde(default)]
default_prefix: Option<String>,
#[serde(default)]
public_base_url: Option<String>,
#[serde(default)]
access_key_id: Option<String>,
#[serde(default)]
secret_key: Option<String>,
#[serde(default)]
session_token: Option<String>,
#[serde(default)]
is_active: Option<bool>,
#[serde(default)]
metadata: Option<Value>,
}
fn normalize_required_field(name: &str, value: &str) -> Result<String, HttpResponse> {
let trimmed = value.trim();
if trimmed.is_empty() {
return Err(bad_request(
format!("Invalid {name}"),
format!("{name} must not be empty"),
));
}
Ok(trimmed.to_string())
}
fn normalize_optional_field(value: Option<&str>) -> Option<String> {
value
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string)
}
fn normalize_metadata_object(
metadata: Option<Value>,
field_name: &str,
) -> Result<Value, HttpResponse> {
match metadata {
Some(Value::Object(map)) => Ok(Value::Object(map)),
Some(other) => Err(bad_request(
format!("Invalid {field_name}"),
format!("{field_name} must be a JSON object, got {other}"),
)),
None => Ok(json!({})),
}
}
fn parse_s3_id(raw_id: &str) -> Result<Uuid, HttpResponse> {
Uuid::parse_str(raw_id).map_err(|_| bad_request("Invalid s3 id", "path id must be a UUID"))
}
fn map_store_error(
err: athena_s3::S3CredentialStoreError,
unavailable_message: &str,
default_message: &str,
) -> HttpResponse {
match err {
athena_s3::S3CredentialStoreError::Database(sqlx::Error::Database(_)) => {
service_unavailable(unavailable_message, err.to_string())
}
athena_s3::S3CredentialStoreError::NotFound => {
crate::api::response::not_found("S3 catalog entry not found", err.to_string())
}
_ => internal_error(default_message, err.to_string()),
}
}
async fn list_s3_catalogs(req: HttpRequest, state: Data<AppState>) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
match athena_s3::list_s3_catalogs(&pool).await {
Ok(data) => HttpResponse::Ok().json(S3CatalogListResponse { data }),
Err(err) => map_store_error(
err,
"S3 catalog unavailable",
"Failed to list S3 catalog entries",
),
}
}
async fn create_s3_catalog(
req: HttpRequest,
state: Data<AppState>,
body: Json<CreateS3CatalogRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
let metadata = match normalize_metadata_object(body.metadata.clone(), "metadata") {
Ok(value) => value,
Err(resp) => return resp,
};
let input = match (
normalize_required_field("name", &body.name),
normalize_required_field("endpoint", &body.endpoint),
normalize_required_field("region", &body.region),
normalize_required_field("bucket", &body.bucket),
normalize_required_field("access_key_id", &body.access_key_id),
normalize_required_field("secret_key", &body.secret_key),
) {
(Ok(name), Ok(endpoint), Ok(region), Ok(bucket), Ok(access_key_id), Ok(secret_key)) => {
athena_s3::CreateS3CatalogInput {
name,
description: normalize_optional_field(body.description.as_deref())
.unwrap_or_default(),
endpoint,
region,
bucket: Some(bucket),
provider: normalize_optional_field(body.provider.as_deref())
.unwrap_or_else(|| "s3".to_string()),
force_path_style: body.force_path_style.unwrap_or(false),
default_prefix: normalize_optional_field(body.default_prefix.as_deref()),
public_base_url: normalize_optional_field(body.public_base_url.as_deref()),
access_key_id,
secret_key,
session_token: normalize_optional_field(body.session_token.as_deref()),
metadata,
}
}
(Err(resp), _, _, _, _, _)
| (_, Err(resp), _, _, _, _)
| (_, _, Err(resp), _, _, _)
| (_, _, _, Err(resp), _, _)
| (_, _, _, _, Err(resp), _)
| (_, _, _, _, _, Err(resp)) => return resp,
};
match athena_s3::create_s3_catalog(&pool, &input).await {
Ok(data) => HttpResponse::Ok().json(data),
Err(err) => map_store_error(
err,
"S3 catalog unavailable",
"Failed to create S3 catalog entry",
),
}
}
async fn update_s3_catalog(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
body: Json<UpdateS3CatalogRequest>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let s3_id = match parse_s3_id(&path) {
Ok(id) => id,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
if body.access_key_id.is_some() ^ body.secret_key.is_some() {
return bad_request(
"Invalid credential rotation payload",
"access_key_id and secret_key must be provided together when rotating credentials",
);
}
let metadata = match body.metadata.clone() {
Some(value) => match normalize_metadata_object(Some(value), "metadata") {
Ok(Value::Object(map)) => Some(Value::Object(map)),
Ok(value) => Some(value),
Err(resp) => return resp,
},
None => None,
};
let normalize_optional_required =
|name: &str, value: Option<&str>| -> Result<Option<String>, HttpResponse> {
match value {
Some(raw) => normalize_required_field(name, raw).map(Some),
None => Ok(None),
}
};
let input = match (
normalize_optional_required("name", body.name.as_deref()),
normalize_optional_required("endpoint", body.endpoint.as_deref()),
normalize_optional_required("region", body.region.as_deref()),
normalize_optional_required("access_key_id", body.access_key_id.as_deref()),
normalize_optional_required("secret_key", body.secret_key.as_deref()),
) {
(Ok(name), Ok(endpoint), Ok(region), Ok(access_key_id), Ok(secret_key)) => {
athena_s3::UpdateS3CatalogInput {
name,
description: normalize_optional_field(body.description.as_deref()),
endpoint,
region,
bucket: normalize_optional_field(body.bucket.as_deref()),
provider: normalize_optional_field(body.provider.as_deref()),
force_path_style: body.force_path_style,
default_prefix: normalize_optional_field(body.default_prefix.as_deref()),
public_base_url: normalize_optional_field(body.public_base_url.as_deref()),
access_key_id,
secret_key,
session_token: normalize_optional_field(body.session_token.as_deref()),
is_active: body.is_active,
metadata,
}
}
(Err(resp), _, _, _, _)
| (_, Err(resp), _, _, _)
| (_, _, Err(resp), _, _)
| (_, _, _, Err(resp), _)
| (_, _, _, _, Err(resp)) => return resp,
};
match athena_s3::update_s3_catalog(&pool, s3_id, &input).await {
Ok(data) => HttpResponse::Ok().json(data),
Err(err) => map_store_error(
err,
"S3 catalog unavailable",
"Failed to update S3 catalog entry",
),
}
}
async fn delete_s3_catalog(
req: HttpRequest,
state: Data<AppState>,
path: Path<String>,
) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let s3_id = match parse_s3_id(&path) {
Ok(id) => id,
Err(resp) => return resp,
};
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
match athena_s3::delete_s3_catalog(&pool, s3_id).await {
Ok(()) => HttpResponse::Ok().json(DeleteS3CatalogResponse {
id: s3_id.to_string(),
deleted: true,
}),
Err(err) => map_store_error(
err,
"S3 catalog unavailable",
"Failed to delete S3 catalog entry",
),
}
}
async fn list_s3_credentials(req: HttpRequest, state: Data<AppState>) -> HttpResponse {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let pool = match required_client_pool(&req, state.get_ref()).await {
Ok(pool) => pool,
Err(resp) => return resp,
};
match athena_s3::list_s3_credentials(&pool).await {
Ok(data) => HttpResponse::Ok().json(S3CredentialsListResponse { data }),
Err(err) => map_store_error(
err,
"S3 credential store unavailable",
"Failed to list S3 credentials",
),
}
}
pub fn configure_storage_routes(cfg: &mut web::ServiceConfig) {
cfg.route("/catalogs", web::get().to(list_s3_catalogs))
.route("/catalogs", web::post().to(create_s3_catalog))
.route("/credentials", web::get().to(list_s3_credentials))
.route("/catalogs/{id}", web::patch().to(update_s3_catalog))
.route("/catalogs/{id}", web::delete().to(delete_s3_catalog));
}