use actix_web::{HttpRequest, HttpResponse, get, web};
use serde::Deserialize;
use serde_json::{Map, Value, json};
use std::env;
use std::time::Instant;
use tokio::time::timeout;
use uuid::Uuid;
use crate::AppState;
use crate::api::debug::shared::{
DEBUG_JWT_SECRET_ENV, DEBUG_PROBE_TIMEOUT, DEBUG_S3_ENDPOINT_PATH, PrivilegedDebugAccess,
RelationResolutionSpec, build_debug_payload, build_origin_policy_debug,
build_privileged_debug_payload, build_relation_resolution_rows, build_request_transport_debug,
computed_base_url, evaluate_privileged_debug_access, header_value_trimmed,
normalize_tenant_header_value, probe_pg_context,
};
use crate::api::gateway::auth::{read_right_for_resource, require_admin_or_gateway};
use crate::api::headers::request_context::resolved_athena_client;
use crate::api::headers::x_athena_client::x_athena_client;
use crate::api::storage::service::build_s3_client_with_session_token;
use crate::api::storage::validation::{
validate_bucket_name, validate_region, validate_storage_endpoint,
};
use crate::athena::postgres_clients::ensure_catalog_database_client_loaded;
use crate::drivers::postgresql::sqlx_driver::RegisteredClient;
const S3_DEBUG_ENV_KEYS: &[&str] = &[
"ATHENA_KEY_12",
DEBUG_JWT_SECRET_ENV,
"JWT_SECRET",
"ATHENA_STORAGE_ALLOW_HTTP",
"ATHENA_CORS_ALLOWED_ORIGINS",
];
#[derive(Debug, Deserialize)]
pub struct S3DebugQuery {
#[serde(default)]
jwt_secret: Option<String>,
#[serde(default)]
s3_id: Option<String>,
#[serde(default)]
name: Option<String>,
#[serde(default)]
bucket: Option<String>,
}
#[derive(Debug)]
struct S3Resolution {
configured_header_value: Option<String>,
resolved_tenant: Option<String>,
resolution_status: &'static str,
resolution_mode: Option<&'static str>,
resolution_error: Option<String>,
fallback_header_values: Value,
metadata_client: Option<RegisteredClient>,
metadata_pool: Option<sqlx::PgPool>,
}
#[derive(Debug)]
struct SelectedCatalog {
catalog: athena_s3::S3CatalogItem,
selection_mode: &'static str,
}
#[derive(Debug)]
struct CatalogSelection {
selected: Option<SelectedCatalog>,
status: &'static str,
error: Option<String>,
}
#[get("/debug/s3")]
pub async fn debug_s3(
req: HttpRequest,
app_state: web::Data<AppState>,
query: web::Query<S3DebugQuery>,
) -> HttpResponse {
if let Err(resp) = require_admin_or_gateway(
&req,
app_state.get_ref(),
None,
vec![read_right_for_resource(None)],
)
.await
{
return resp;
}
let privileged_access = evaluate_privileged_debug_access(query.jwt_secret.as_deref());
let resolution = resolve_s3_target(&req, app_state.get_ref()).await;
let (tenant_configs, selection) = build_s3_tenant_configs(&resolution, &query).await;
let current_tenant_probe = build_s3_probe(&resolution, &selection).await;
let probe_status = current_tenant_probe
.get("status")
.and_then(Value::as_str)
.unwrap_or("unknown");
let runtime = build_s3_runtime(
&req,
app_state.get_ref(),
&privileged_access,
tenant_configs.get("known_catalogs"),
);
let request = build_s3_request_payload(&req, app_state.get_ref(), &resolution, &query);
let privileged_debug = build_privileged_debug_payload(
app_state.get_ref(),
&privileged_access,
S3_DEBUG_ENV_KEYS,
json!({
"request_base_url": computed_base_url(&req),
"storage_allow_http": storage_allow_http(),
}),
);
HttpResponse::Ok().json(build_debug_payload(
"athena-s3",
DEBUG_S3_ENDPOINT_PATH,
top_level_status(resolution.resolution_status, probe_status),
privileged_debug,
runtime,
request,
tenant_configs,
current_tenant_probe,
))
}
pub fn services(cfg: &mut web::ServiceConfig) {
cfg.service(debug_s3);
}
async fn resolve_s3_target(req: &HttpRequest, app_state: &AppState) -> S3Resolution {
let configured_header_value = header_value_trimmed(req, "x-athena-client");
let normalized_client = match normalize_tenant_header_value(Some(x_athena_client(req))) {
Ok(value) => value,
Err(error) => {
return S3Resolution {
configured_header_value,
resolved_tenant: None,
resolution_status: "invalid_header",
resolution_mode: Some("x-athena-client"),
resolution_error: Some(error),
fallback_header_values: json!({
"resolved_request_context_client": resolved_athena_client(req),
"host_header": header_value_trimmed(req, "host"),
"x_forwarded_host": header_value_trimmed(req, "x-forwarded-host"),
"x_forwarded_proto": header_value_trimmed(req, "x-forwarded-proto"),
}),
metadata_client: None,
metadata_pool: None,
};
}
};
let Some(client_name) = normalized_client else {
return S3Resolution {
configured_header_value,
resolved_tenant: None,
resolution_status: "unknown_tenant",
resolution_mode: None,
resolution_error: Some(
"no X-Athena-Client header or wildcard host client context was present".to_string(),
),
fallback_header_values: json!({
"resolved_request_context_client": resolved_athena_client(req),
"host_header": header_value_trimmed(req, "host"),
"x_forwarded_host": header_value_trimmed(req, "x-forwarded-host"),
"x_forwarded_proto": header_value_trimmed(req, "x-forwarded-proto"),
}),
metadata_client: None,
metadata_pool: None,
};
};
let metadata_error = ensure_catalog_database_client_loaded(app_state, &client_name)
.await
.err();
let metadata_client = app_state.pg_registry.registered_client(&client_name);
let metadata_pool = app_state.pg_registry.get_pool(&client_name);
if metadata_client.is_none() {
return S3Resolution {
configured_header_value,
resolved_tenant: Some(client_name),
resolution_status: "unknown_tenant",
resolution_mode: Some("x-athena-client"),
resolution_error: metadata_error.or_else(|| {
Some(
"metadata client is not present in the live Postgres registry or catalog"
.to_string(),
)
}),
fallback_header_values: json!({
"resolved_request_context_client": resolved_athena_client(req),
"host_header": header_value_trimmed(req, "host"),
"x_forwarded_host": header_value_trimmed(req, "x-forwarded-host"),
"x_forwarded_proto": header_value_trimmed(req, "x-forwarded-proto"),
}),
metadata_client: None,
metadata_pool: None,
};
}
S3Resolution {
configured_header_value,
resolved_tenant: Some(client_name),
resolution_status: "ok",
resolution_mode: Some("metadata_client"),
resolution_error: metadata_error,
fallback_header_values: json!({
"resolved_request_context_client": resolved_athena_client(req),
"host_header": header_value_trimmed(req, "host"),
"x_forwarded_host": header_value_trimmed(req, "x-forwarded-host"),
"x_forwarded_proto": header_value_trimmed(req, "x-forwarded-proto"),
}),
metadata_client,
metadata_pool,
}
}
async fn build_s3_tenant_configs(
resolution: &S3Resolution,
query: &S3DebugQuery,
) -> (Value, CatalogSelection) {
let Some(pool) = &resolution.metadata_pool else {
return (
json!({
"header_name": "X-Athena-Client",
"resolved_metadata_client": resolution.resolved_tenant,
"default_tenant": Value::Null,
"known_catalogs": Vec::<Value>::new(),
"catalog_status": if resolution.resolution_status == "ok" { "metadata_unavailable" } else { resolution.resolution_status },
"catalog_error": resolution.resolution_error,
"selected_catalog": Value::Null,
}),
CatalogSelection {
selected: None,
status: if resolution.resolution_status == "ok" {
"metadata_unavailable"
} else {
resolution.resolution_status
},
error: resolution.resolution_error.clone(),
},
);
};
match athena_s3::list_s3_catalogs(pool).await {
Ok(catalogs) => {
let selection = select_catalog(&catalogs, query);
let default_tenant = if catalogs.len() == 1 {
catalogs.first().map(|catalog| catalog.id.clone())
} else {
None
};
(
json!({
"header_name": "X-Athena-Client",
"resolved_metadata_client": resolution.resolved_tenant,
"default_tenant": default_tenant,
"known_catalogs": catalogs.iter().map(mask_catalog_for_debug).collect::<Vec<_>>(),
"catalog_status": selection.status,
"catalog_error": selection.error,
"selected_catalog": selection.selected.as_ref().map(|selected| mask_catalog_for_debug(&selected.catalog)),
}),
selection,
)
}
Err(error) => (
json!({
"header_name": "X-Athena-Client",
"resolved_metadata_client": resolution.resolved_tenant,
"default_tenant": Value::Null,
"known_catalogs": Vec::<Value>::new(),
"catalog_status": "probe_failed",
"catalog_error": error.to_string(),
"selected_catalog": Value::Null,
}),
CatalogSelection {
selected: None,
status: "probe_failed",
error: Some(error.to_string()),
},
),
}
}
fn build_s3_runtime(
req: &HttpRequest,
app_state: &AppState,
access: &PrivilegedDebugAccess,
known_catalogs: Option<&Value>,
) -> Value {
json!({
"tenant_header_name": "X-Athena-Client",
"debug_endpoint": DEBUG_S3_ENDPOINT_PATH,
"request_base_url": computed_base_url(req),
"enabled_features": {
"storage_allow_http": storage_allow_http(),
"cors_allow_any_origin": app_state.cors_allow_any_origin,
"cors_allowed_origin_count": app_state.cors_allowed_origins.len(),
"privileged_debug_available": access.expected_secret_source.is_some(),
},
"route_paths": {
"storage_base_path": "/storage",
"catalog_base_path": "/storage/catalogs",
"debug_endpoint": DEBUG_S3_ENDPOINT_PATH,
},
"origin_policy": build_origin_policy_debug(req, app_state),
"known_catalog_count": known_catalogs.and_then(Value::as_array).map(|values| values.len()).unwrap_or(0),
"mapping_application_note": "Catalog ID/name/bucket selection is runtime-applied for this diagnostic probe. Managed-file table visibility rows below are diagnostic checks for supporting metadata, not bucket routing inputs.",
})
}
fn build_s3_request_payload(
req: &HttpRequest,
app_state: &AppState,
resolution: &S3Resolution,
query: &S3DebugQuery,
) -> Value {
let mut payload = match build_request_transport_debug(req, app_state) {
Value::Object(map) => map,
_ => Map::new(),
};
payload.insert(
"configured_header_name".to_string(),
Value::String("X-Athena-Client".to_string()),
);
payload.insert(
"configured_header_value".to_string(),
resolution
.configured_header_value
.clone()
.map(Value::String)
.unwrap_or(Value::Null),
);
payload.insert(
"fallback_header_values".to_string(),
resolution.fallback_header_values.clone(),
);
payload.insert(
"resolved_tenant".to_string(),
resolution
.resolved_tenant
.clone()
.map(Value::String)
.unwrap_or(Value::Null),
);
payload.insert(
"resolution_status".to_string(),
Value::String(resolution.resolution_status.to_string()),
);
payload.insert(
"resolution_mode".to_string(),
resolution
.resolution_mode
.map(|value| Value::String(value.to_string()))
.unwrap_or(Value::Null),
);
payload.insert(
"resolution_error".to_string(),
resolution
.resolution_error
.clone()
.map(Value::String)
.unwrap_or(Value::Null),
);
payload.insert(
"catalog_selector".to_string(),
json!({
"s3_id": query.s3_id.as_deref().map(str::trim).filter(|value| !value.is_empty()),
"name": query.name.as_deref().map(str::trim).filter(|value| !value.is_empty()),
"bucket": query.bucket.as_deref().map(str::trim).filter(|value| !value.is_empty()),
"jwt_secret_supplied": query.jwt_secret.is_some(),
}),
);
Value::Object(payload)
}
async fn build_s3_probe(resolution: &S3Resolution, selection: &CatalogSelection) -> Value {
let Some(metadata_client) = &resolution.metadata_client else {
return json!({
"status": if resolution.resolution_status == "ok" { "metadata_unavailable" } else { resolution.resolution_status },
"tenant_id": resolution.resolved_tenant,
"selected_catalog": Value::Null,
"routing_context": Value::Null,
"search_path": Value::Null,
"upstream_probe": {
"status": if resolution.resolution_status == "ok" { "metadata_unavailable" } else { resolution.resolution_status },
"error": resolution.resolution_error,
},
"resource_resolution": Vec::<Value>::new(),
});
};
let Some(metadata_pool) = &resolution.metadata_pool else {
return json!({
"status": "metadata_unavailable",
"tenant_id": resolution.resolved_tenant,
"selected_catalog": Value::Null,
"routing_context": {
"metadata_client_name": metadata_client.client_name,
"metadata_client_source": metadata_client.source,
},
"search_path": Value::Null,
"upstream_probe": {
"status": "metadata_unavailable",
"error": resolution.resolution_error,
},
"resource_resolution": Vec::<Value>::new(),
});
};
let metadata_context = probe_pg_context(metadata_pool).await.ok();
let mut resource_rows = if let Some(context) = metadata_context.as_ref() {
build_relation_resolution_rows(
metadata_pool,
&context.runtime_effective_schemas,
&[
RelationResolutionSpec {
resource_kind: "metadata_table",
logical_resource: "athena.s3".to_string(),
runtime_lookup_resource: "athena.s3".to_string(),
configured_mapped_resource: None,
mapping_applied_at_runtime: false,
note: "Diagnostic visibility check for catalog definitions.".to_string(),
},
RelationResolutionSpec {
resource_kind: "metadata_table",
logical_resource: "athena.s3_credentials".to_string(),
runtime_lookup_resource: "athena.s3_credentials".to_string(),
configured_mapped_resource: None,
mapping_applied_at_runtime: false,
note: "Diagnostic visibility check for active S3 credentials.".to_string(),
},
RelationResolutionSpec {
resource_kind: "metadata_table",
logical_resource: "athena.files".to_string(),
runtime_lookup_resource: "athena.files".to_string(),
configured_mapped_resource: None,
mapping_applied_at_runtime: false,
note: "Diagnostic visibility check for managed file rows.".to_string(),
},
RelationResolutionSpec {
resource_kind: "metadata_table",
logical_resource: "athena.file_permissions".to_string(),
runtime_lookup_resource: "athena.file_permissions".to_string(),
configured_mapped_resource: None,
mapping_applied_at_runtime: false,
note: "Diagnostic visibility check for managed file permissions.".to_string(),
},
RelationResolutionSpec {
resource_kind: "metadata_table",
logical_resource: "athena.s3_url_cache".to_string(),
runtime_lookup_resource: "athena.s3_url_cache".to_string(),
configured_mapped_resource: None,
mapping_applied_at_runtime: false,
note: "Diagnostic visibility check for presigned URL cache rows.".to_string(),
},
],
)
.await
} else {
Vec::new()
};
let Some(selected_catalog) = selection.selected.as_ref() else {
let note = match selection.status {
"selection_required" => {
"Provide ?s3_id=..., ?name=..., or ?bucket=... when multiple catalogs are present."
}
"unknown_catalog" => {
"The supplied selector did not match any live S3 catalog entry for the resolved tenant."
}
"no_catalogs" => {
"No live S3 catalog entries are registered for the resolved metadata client."
}
_ => "No S3 catalog could be selected for this diagnostic probe.",
};
return json!({
"status": selection.status,
"tenant_id": resolution.resolved_tenant,
"selected_catalog": Value::Null,
"routing_context": {
"metadata_client_name": metadata_client.client_name,
"metadata_client_source": metadata_client.source,
},
"search_path": metadata_context,
"upstream_probe": {
"status": selection.status,
"error": selection.error,
"note": note,
},
"resource_resolution": resource_rows,
});
};
let selected_catalog_id = match Uuid::parse_str(&selected_catalog.catalog.id) {
Ok(value) => value,
Err(error) => {
return json!({
"status": "invalid_header",
"tenant_id": resolution.resolved_tenant,
"selected_catalog": mask_catalog_for_debug(&selected_catalog.catalog),
"routing_context": {
"metadata_client_name": metadata_client.client_name,
"metadata_client_source": metadata_client.source,
},
"search_path": metadata_context,
"upstream_probe": {
"status": "probe_failed",
"error": error.to_string(),
},
"resource_resolution": resource_rows,
});
}
};
let runtime_binding =
match athena_s3::load_s3_catalog_runtime(metadata_pool, selected_catalog_id).await {
Ok(binding) => binding,
Err(error) => {
return json!({
"status": "probe_failed",
"tenant_id": resolution.resolved_tenant,
"selected_catalog": mask_catalog_for_debug(&selected_catalog.catalog),
"routing_context": {
"metadata_client_name": metadata_client.client_name,
"metadata_client_source": metadata_client.source,
},
"search_path": metadata_context,
"upstream_probe": {
"status": "probe_failed",
"error": error.to_string(),
},
"resource_resolution": resource_rows,
});
}
};
let endpoint_validation = validate_storage_endpoint(
runtime_binding
.catalog
.endpoint
.as_deref()
.unwrap_or_default(),
)
.map(|_| "ok".to_string())
.unwrap_or_else(|response| response_to_status_line(&response));
let region_validation = validate_region(
runtime_binding
.catalog
.region
.as_deref()
.unwrap_or_default(),
)
.map(|_| "ok".to_string())
.unwrap_or_else(|response| response_to_status_line(&response));
let bucket_validation = Some(runtime_binding.catalog.bucket.as_str())
.map(validate_bucket_name)
.transpose()
.map(|_| "ok".to_string())
.unwrap_or_else(|response| response_to_status_line(&response));
resource_rows.insert(
0,
json!({
"resource_kind": "catalog_binding",
"logical_resource": runtime_binding.catalog.name,
"runtime_lookup_resource": runtime_binding.catalog.bucket,
"configured_mapped_resource": runtime_binding.catalog.endpoint,
"mapping_applied_at_runtime": true,
"runtime_status": {
"status": "ok",
"provider": runtime_binding.catalog.provider,
"region": runtime_binding.catalog.region,
"active_access_key": mask_access_key(&runtime_binding.access_key_id),
"session_token_present": runtime_binding.session_token.as_ref().is_some_and(|value| !value.trim().is_empty()),
},
"mapped_status": {
"status": "ok",
"path_style_forced": path_style_forced(runtime_binding.catalog.endpoint.as_deref().unwrap_or_default()),
"endpoint_validation": endpoint_validation,
"region_validation": region_validation,
"bucket_validation": bucket_validation,
},
"overall_status": "ok",
"note": "Catalog name, bucket, endpoint, and active credential are the live inputs used by this S3 runtime probe."
}),
);
let upstream_started = Instant::now();
let s3_client = build_s3_client_with_session_token(
runtime_binding
.catalog
.endpoint
.as_deref()
.unwrap_or_default(),
runtime_binding
.catalog
.region
.as_deref()
.unwrap_or_default(),
&runtime_binding.catalog.bucket,
&runtime_binding.access_key_id,
&runtime_binding.secret_key,
runtime_binding.session_token.as_deref(),
)
.await;
let bucket_name = runtime_binding.catalog.bucket.clone();
let upstream_probe = if bucket_name.trim().is_empty() {
json!({
"status": "missing_bucket",
"probe_kind": "s3_list_objects_v2",
"error": "selected catalog does not define a bucket name",
})
} else {
match timeout(
DEBUG_PROBE_TIMEOUT,
s3_client
.list_objects_v2()
.bucket(&bucket_name)
.max_keys(1)
.send(),
)
.await
{
Err(_) => json!({
"status": "probe_failed",
"probe_kind": "s3_list_objects_v2",
"latency_ms": upstream_started.elapsed().as_millis(),
"error": format!("probe timed out after {}s", DEBUG_PROBE_TIMEOUT.as_secs()),
}),
Ok(Err(error)) => json!({
"status": "probe_failed",
"probe_kind": "s3_list_objects_v2",
"latency_ms": upstream_started.elapsed().as_millis(),
"error": error.to_string(),
}),
Ok(Ok(output)) => json!({
"status": "ok",
"probe_kind": "s3_list_objects_v2",
"latency_ms": upstream_started.elapsed().as_millis(),
"object_count_sampled": output.contents().len(),
"is_truncated": output.is_truncated(),
"next_continuation_token_present": output.next_continuation_token().is_some(),
}),
}
};
json!({
"status": upstream_probe.get("status").and_then(Value::as_str).unwrap_or("unknown"),
"tenant_id": resolution.resolved_tenant,
"selected_catalog": mask_catalog_for_debug(&runtime_binding.catalog),
"selection_mode": selected_catalog.selection_mode,
"routing_context": {
"metadata_client_name": metadata_client.client_name,
"metadata_client_source": metadata_client.source,
"provider": runtime_binding.catalog.provider,
"endpoint": runtime_binding.catalog.endpoint,
"region": runtime_binding.catalog.region,
"bucket": runtime_binding.catalog.bucket,
"account_reference": mask_access_key(&runtime_binding.access_key_id),
"path_style_forced": path_style_forced(runtime_binding.catalog.endpoint.as_deref().unwrap_or_default()),
"signing_ready": !runtime_binding.access_key_id.trim().is_empty()
&& !runtime_binding.secret_key.trim().is_empty(),
"session_token_present": runtime_binding.session_token.as_ref().is_some_and(|value| !value.trim().is_empty()),
},
"search_path": metadata_context,
"upstream_probe": upstream_probe,
"resource_resolution": resource_rows,
})
}
fn select_catalog(catalogs: &[athena_s3::S3CatalogItem], query: &S3DebugQuery) -> CatalogSelection {
let select_by = |predicate: &dyn Fn(&athena_s3::S3CatalogItem) -> bool,
mode: &'static str|
-> Option<SelectedCatalog> {
catalogs
.iter()
.find(|catalog| predicate(catalog))
.cloned()
.map(|catalog| SelectedCatalog {
catalog,
selection_mode: mode,
})
};
if let Some(s3_id) = query
.s3_id
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
return match select_by(&|catalog| catalog.id == s3_id, "s3_id") {
Some(selected) => CatalogSelection {
selected: Some(selected),
status: "ok",
error: None,
},
None => CatalogSelection {
selected: None,
status: "unknown_catalog",
error: Some(format!("no S3 catalog matched s3_id '{s3_id}'")),
},
};
}
if let Some(name) = query
.name
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
return match select_by(&|catalog| catalog.name.eq_ignore_ascii_case(name), "name") {
Some(selected) => CatalogSelection {
selected: Some(selected),
status: "ok",
error: None,
},
None => CatalogSelection {
selected: None,
status: "unknown_catalog",
error: Some(format!("no S3 catalog matched name '{name}'")),
},
};
}
if let Some(bucket) = query
.bucket
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
{
return match select_by(
&|catalog| catalog.bucket.eq_ignore_ascii_case(bucket),
"bucket",
) {
Some(selected) => CatalogSelection {
selected: Some(selected),
status: "ok",
error: None,
},
None => CatalogSelection {
selected: None,
status: "unknown_catalog",
error: Some(format!("no S3 catalog matched bucket '{bucket}'")),
},
};
}
if catalogs.len() == 1 {
return CatalogSelection {
selected: catalogs.first().cloned().map(|catalog| SelectedCatalog {
catalog,
selection_mode: "single_catalog_fallback",
}),
status: "ok",
error: None,
};
}
if catalogs.is_empty() {
return CatalogSelection {
selected: None,
status: "no_catalogs",
error: Some(
"no S3 catalog entries are currently registered for the selected metadata client"
.to_string(),
),
};
}
CatalogSelection {
selected: None,
status: "selection_required",
error: Some(
"multiple S3 catalog entries are present; provide ?s3_id=..., ?name=..., or ?bucket=..."
.to_string(),
),
}
}
fn mask_catalog_for_debug(catalog: &athena_s3::S3CatalogItem) -> Value {
json!({
"id": catalog.id,
"name": catalog.name,
"description": catalog.description,
"endpoint": catalog.endpoint,
"region": catalog.region,
"bucket": catalog.bucket,
"provider": catalog.provider,
"is_active": catalog.is_active,
"active_credential_id": catalog.active_credential_id,
"active_access_key": catalog.active_access_key.as_deref().map(mask_access_key),
"created_at": catalog.created_at,
"updated_at": catalog.updated_at,
})
}
fn mask_access_key(value: &str) -> String {
let trimmed = value.trim();
if trimmed.len() <= 8 {
return "<redacted>".to_string();
}
format!("{}...{}", &trimmed[..4], &trimmed[trimmed.len() - 4..])
}
fn path_style_forced(endpoint: &str) -> bool {
let normalized = endpoint.trim().to_ascii_lowercase();
!normalized.is_empty() && !normalized.contains("amazonaws.com")
}
fn storage_allow_http() -> bool {
env::var("ATHENA_STORAGE_ALLOW_HTTP")
.map(|value| value == "1" || value.eq_ignore_ascii_case("true"))
.unwrap_or(false)
}
fn top_level_status(request_status: &str, probe_status: &str) -> String {
if request_status != "ok" {
return request_status.to_string();
}
probe_status.to_string()
}
fn response_to_status_line(response: &HttpResponse) -> String {
response
.error()
.map(|error| error.to_string())
.unwrap_or_else(|| response.status().to_string())
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
fn catalog(id: &str, name: &str, bucket: Option<&str>) -> athena_s3::S3CatalogItem {
athena_s3::S3CatalogItem {
id: id.to_string(),
name: name.to_string(),
description: String::new(),
endpoint: Some("https://s3.example.com".to_string()),
region: Some("eu-central-1".to_string()),
bucket: bucket.unwrap_or_default().to_string(),
provider: "s3".to_string(),
force_path_style: false,
default_prefix: None,
public_base_url: None,
is_active: true,
metadata: json!({}),
active_credential_id: Some("credential-1".to_string()),
active_access_key: Some("AKIA_TEST_VALUE".to_string()),
created_at: Utc::now(),
updated_at: Utc::now(),
}
}
#[test]
fn select_catalog_requires_selector_when_multiple_catalogs_exist() {
let selection = select_catalog(
&[
catalog("catalog-1", "primary", Some("bucket-a")),
catalog("catalog-2", "archive", Some("bucket-b")),
],
&S3DebugQuery {
jwt_secret: None,
s3_id: None,
name: None,
bucket: None,
},
);
assert_eq!(selection.status, "selection_required");
assert!(selection.selected.is_none());
assert!(
selection
.error
.as_deref()
.is_some_and(|value| value.contains("multiple S3 catalog entries"))
);
}
#[test]
fn select_catalog_reports_unknown_catalog_for_missing_selector_match() {
let selection = select_catalog(
&[catalog("catalog-1", "primary", Some("bucket-a"))],
&S3DebugQuery {
jwt_secret: None,
s3_id: None,
name: Some("missing".to_string()),
bucket: None,
},
);
assert_eq!(selection.status, "unknown_catalog");
assert!(selection.selected.is_none());
assert!(
selection
.error
.as_deref()
.is_some_and(|value| value.contains("missing"))
);
}
#[test]
fn select_catalog_uses_single_catalog_fallback() {
let selection = select_catalog(
&[catalog("catalog-1", "primary", Some("bucket-a"))],
&S3DebugQuery {
jwt_secret: None,
s3_id: None,
name: None,
bucket: None,
},
);
assert_eq!(selection.status, "ok");
assert_eq!(
selection
.selected
.as_ref()
.map(|value| value.selection_mode),
Some("single_catalog_fallback")
);
assert_eq!(
selection
.selected
.as_ref()
.map(|value| value.catalog.id.as_str()),
Some("catalog-1")
);
}
}