athena_rs 3.18.0

Hyper performant polyglot Database driver
Documentation
use actix_web::{HttpRequest, HttpResponse, web, web::Data, web::Json, web::Path};
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use uuid::Uuid;

use crate::AppState;
use crate::api::auth::authorize_static_admin_key;
use crate::api::client_context::required_client_pool;
use crate::api::response::{bad_request, internal_error, service_unavailable};

#[derive(Debug, Serialize)]
struct S3CatalogListResponse {
    data: Vec<athena_s3::S3CatalogItem>,
}

#[derive(Debug, Serialize)]
struct S3CredentialsListResponse {
    data: Vec<athena_s3::S3CredentialListItem>,
}

#[derive(Debug, Serialize)]
struct DeleteS3CatalogResponse {
    id: String,
    deleted: bool,
}

#[derive(Debug, Deserialize)]
struct CreateS3CatalogRequest {
    name: String,
    #[serde(default)]
    description: Option<String>,
    endpoint: String,
    region: String,
    bucket: String,
    #[serde(default)]
    provider: Option<String>,
    #[serde(default)]
    force_path_style: Option<bool>,
    #[serde(default)]
    default_prefix: Option<String>,
    #[serde(default)]
    public_base_url: Option<String>,
    access_key_id: String,
    secret_key: String,
    #[serde(default)]
    session_token: Option<String>,
    #[serde(default)]
    metadata: Option<Value>,
}

#[derive(Debug, Deserialize)]
struct UpdateS3CatalogRequest {
    #[serde(default)]
    name: Option<String>,
    #[serde(default)]
    description: Option<String>,
    #[serde(default)]
    endpoint: Option<String>,
    #[serde(default)]
    region: Option<String>,
    #[serde(default)]
    bucket: Option<String>,
    #[serde(default)]
    provider: Option<String>,
    #[serde(default)]
    force_path_style: Option<bool>,
    #[serde(default)]
    default_prefix: Option<String>,
    #[serde(default)]
    public_base_url: Option<String>,
    #[serde(default)]
    access_key_id: Option<String>,
    #[serde(default)]
    secret_key: Option<String>,
    #[serde(default)]
    session_token: Option<String>,
    #[serde(default)]
    is_active: Option<bool>,
    #[serde(default)]
    metadata: Option<Value>,
}

fn normalize_required_field(name: &str, value: &str) -> Result<String, HttpResponse> {
    let trimmed = value.trim();
    if trimmed.is_empty() {
        return Err(bad_request(
            format!("Invalid {name}"),
            format!("{name} must not be empty"),
        ));
    }
    Ok(trimmed.to_string())
}

fn normalize_optional_field(value: Option<&str>) -> Option<String> {
    value
        .map(str::trim)
        .filter(|value| !value.is_empty())
        .map(str::to_string)
}

fn normalize_metadata_object(
    metadata: Option<Value>,
    field_name: &str,
) -> Result<Value, HttpResponse> {
    match metadata {
        Some(Value::Object(map)) => Ok(Value::Object(map)),
        Some(other) => Err(bad_request(
            format!("Invalid {field_name}"),
            format!("{field_name} must be a JSON object, got {other}"),
        )),
        None => Ok(json!({})),
    }
}

fn parse_s3_id(raw_id: &str) -> Result<Uuid, HttpResponse> {
    Uuid::parse_str(raw_id).map_err(|_| bad_request("Invalid s3 id", "path id must be a UUID"))
}

fn map_store_error(
    err: athena_s3::S3CredentialStoreError,
    unavailable_message: &str,
    default_message: &str,
) -> HttpResponse {
    match err {
        athena_s3::S3CredentialStoreError::Database(sqlx::Error::Database(_)) => {
            service_unavailable(unavailable_message, err.to_string())
        }
        athena_s3::S3CredentialStoreError::NotFound => {
            crate::api::response::not_found("S3 catalog entry not found", err.to_string())
        }
        _ => internal_error(default_message, err.to_string()),
    }
}

async fn list_s3_catalogs(req: HttpRequest, state: Data<AppState>) -> HttpResponse {
    if let Err(resp) = authorize_static_admin_key(&req) {
        return resp;
    }

    let pool = match required_client_pool(&req, state.get_ref()).await {
        Ok(pool) => pool,
        Err(resp) => return resp,
    };

    match athena_s3::list_s3_catalogs(&pool).await {
        Ok(data) => HttpResponse::Ok().json(S3CatalogListResponse { data }),
        Err(err) => map_store_error(
            err,
            "S3 catalog unavailable",
            "Failed to list S3 catalog entries",
        ),
    }
}

async fn create_s3_catalog(
    req: HttpRequest,
    state: Data<AppState>,
    body: Json<CreateS3CatalogRequest>,
) -> HttpResponse {
    if let Err(resp) = authorize_static_admin_key(&req) {
        return resp;
    }

    let pool = match required_client_pool(&req, state.get_ref()).await {
        Ok(pool) => pool,
        Err(resp) => return resp,
    };

    let metadata = match normalize_metadata_object(body.metadata.clone(), "metadata") {
        Ok(value) => value,
        Err(resp) => return resp,
    };

    let input = match (
        normalize_required_field("name", &body.name),
        normalize_required_field("endpoint", &body.endpoint),
        normalize_required_field("region", &body.region),
        normalize_required_field("bucket", &body.bucket),
        normalize_required_field("access_key_id", &body.access_key_id),
        normalize_required_field("secret_key", &body.secret_key),
    ) {
        (Ok(name), Ok(endpoint), Ok(region), Ok(bucket), Ok(access_key_id), Ok(secret_key)) => {
            athena_s3::CreateS3CatalogInput {
                name,
                description: normalize_optional_field(body.description.as_deref())
                    .unwrap_or_default(),
                endpoint,
                region,
                bucket: Some(bucket),
                provider: normalize_optional_field(body.provider.as_deref())
                    .unwrap_or_else(|| "s3".to_string()),
                force_path_style: body.force_path_style.unwrap_or(false),
                default_prefix: normalize_optional_field(body.default_prefix.as_deref()),
                public_base_url: normalize_optional_field(body.public_base_url.as_deref()),
                access_key_id,
                secret_key,
                session_token: normalize_optional_field(body.session_token.as_deref()),
                metadata,
            }
        }
        (Err(resp), _, _, _, _, _)
        | (_, Err(resp), _, _, _, _)
        | (_, _, Err(resp), _, _, _)
        | (_, _, _, Err(resp), _, _)
        | (_, _, _, _, Err(resp), _)
        | (_, _, _, _, _, Err(resp)) => return resp,
    };

    match athena_s3::create_s3_catalog(&pool, &input).await {
        Ok(data) => HttpResponse::Ok().json(data),
        Err(err) => map_store_error(
            err,
            "S3 catalog unavailable",
            "Failed to create S3 catalog entry",
        ),
    }
}

async fn update_s3_catalog(
    req: HttpRequest,
    state: Data<AppState>,
    path: Path<String>,
    body: Json<UpdateS3CatalogRequest>,
) -> HttpResponse {
    if let Err(resp) = authorize_static_admin_key(&req) {
        return resp;
    }

    let s3_id = match parse_s3_id(&path) {
        Ok(id) => id,
        Err(resp) => return resp,
    };

    let pool = match required_client_pool(&req, state.get_ref()).await {
        Ok(pool) => pool,
        Err(resp) => return resp,
    };

    if body.access_key_id.is_some() ^ body.secret_key.is_some() {
        return bad_request(
            "Invalid credential rotation payload",
            "access_key_id and secret_key must be provided together when rotating credentials",
        );
    }

    let metadata = match body.metadata.clone() {
        Some(value) => match normalize_metadata_object(Some(value), "metadata") {
            Ok(Value::Object(map)) => Some(Value::Object(map)),
            Ok(value) => Some(value),
            Err(resp) => return resp,
        },
        None => None,
    };

    let normalize_optional_required =
        |name: &str, value: Option<&str>| -> Result<Option<String>, HttpResponse> {
            match value {
                Some(raw) => normalize_required_field(name, raw).map(Some),
                None => Ok(None),
            }
        };

    let input = match (
        normalize_optional_required("name", body.name.as_deref()),
        normalize_optional_required("endpoint", body.endpoint.as_deref()),
        normalize_optional_required("region", body.region.as_deref()),
        normalize_optional_required("access_key_id", body.access_key_id.as_deref()),
        normalize_optional_required("secret_key", body.secret_key.as_deref()),
    ) {
        (Ok(name), Ok(endpoint), Ok(region), Ok(access_key_id), Ok(secret_key)) => {
            athena_s3::UpdateS3CatalogInput {
                name,
                description: normalize_optional_field(body.description.as_deref()),
                endpoint,
                region,
                bucket: normalize_optional_field(body.bucket.as_deref()),
                provider: normalize_optional_field(body.provider.as_deref()),
                force_path_style: body.force_path_style,
                default_prefix: normalize_optional_field(body.default_prefix.as_deref()),
                public_base_url: normalize_optional_field(body.public_base_url.as_deref()),
                access_key_id,
                secret_key,
                session_token: normalize_optional_field(body.session_token.as_deref()),
                is_active: body.is_active,
                metadata,
            }
        }
        (Err(resp), _, _, _, _)
        | (_, Err(resp), _, _, _)
        | (_, _, Err(resp), _, _)
        | (_, _, _, Err(resp), _)
        | (_, _, _, _, Err(resp)) => return resp,
    };

    match athena_s3::update_s3_catalog(&pool, s3_id, &input).await {
        Ok(data) => HttpResponse::Ok().json(data),
        Err(err) => map_store_error(
            err,
            "S3 catalog unavailable",
            "Failed to update S3 catalog entry",
        ),
    }
}

async fn delete_s3_catalog(
    req: HttpRequest,
    state: Data<AppState>,
    path: Path<String>,
) -> HttpResponse {
    if let Err(resp) = authorize_static_admin_key(&req) {
        return resp;
    }

    let s3_id = match parse_s3_id(&path) {
        Ok(id) => id,
        Err(resp) => return resp,
    };

    let pool = match required_client_pool(&req, state.get_ref()).await {
        Ok(pool) => pool,
        Err(resp) => return resp,
    };

    match athena_s3::delete_s3_catalog(&pool, s3_id).await {
        Ok(()) => HttpResponse::Ok().json(DeleteS3CatalogResponse {
            id: s3_id.to_string(),
            deleted: true,
        }),
        Err(err) => map_store_error(
            err,
            "S3 catalog unavailable",
            "Failed to delete S3 catalog entry",
        ),
    }
}

/// List reusable S3 credentials registered in Athena's control-plane catalog.
async fn list_s3_credentials(req: HttpRequest, state: Data<AppState>) -> HttpResponse {
    if let Err(resp) = authorize_static_admin_key(&req) {
        return resp;
    }

    let pool = match required_client_pool(&req, state.get_ref()).await {
        Ok(pool) => pool,
        Err(resp) => return resp,
    };

    match athena_s3::list_s3_credentials(&pool).await {
        Ok(data) => HttpResponse::Ok().json(S3CredentialsListResponse { data }),
        Err(err) => map_store_error(
            err,
            "S3 credential store unavailable",
            "Failed to list S3 credentials",
        ),
    }
}

pub fn configure_storage_routes(cfg: &mut web::ServiceConfig) {
    cfg.route("/catalogs", web::get().to(list_s3_catalogs))
        .route("/catalogs", web::post().to(create_s3_catalog))
        .route("/credentials", web::get().to(list_s3_credentials))
        .route("/catalogs/{id}", web::patch().to(update_s3_catalog))
        .route("/catalogs/{id}", web::delete().to(delete_s3_catalog));
}