use actix_web::{HttpRequest, HttpResponse};
use sqlx::postgres::PgPool;
use crate::AppState;
use crate::api::headers::request_context::resolved_athena_client;
use crate::api::response::{bad_request, postgres_client_not_configured, service_unavailable};
use crate::athena::postgres_clients::{
catalog_client_has_database_connection, ensure_catalog_database_client_loaded,
};
use crate::drivers::postgresql::sqlx_driver::RegisteredClient;
pub const ATHENA_CLIENT_HEADER: &str = "X-Athena-Client";
pub fn required_client_name(req: &HttpRequest) -> Result<String, HttpResponse> {
if let Some(client_name) = resolved_athena_client(req)
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
{
return Ok(client_name);
}
req.headers()
.get("x-athena-client")
.and_then(|value| value.to_str().ok())
.map(str::trim)
.filter(|value| !value.is_empty())
.map(ToString::to_string)
.ok_or_else(|| {
bad_request(
"Missing required header",
format!(
"{} header or tenant wildcard hostname is required",
ATHENA_CLIENT_HEADER
),
)
})
}
pub async fn pool_for_client(state: &AppState, client_name: &str) -> Result<PgPool, HttpResponse> {
if let Some(pool) = state.pg_registry.get_pool(client_name) {
return Ok(pool);
}
match ensure_catalog_database_client_loaded(state, client_name).await {
Ok(Some(_)) => {
if let Some(pool) = state.pg_registry.get_pool(client_name) {
return Ok(pool);
}
}
Ok(None) => {}
Err(err) => {
return Err(service_unavailable(
"Client catalog unavailable",
format!("Failed to resolve catalog-backed Postgres client: {}", err),
));
}
}
if let Some(registered_client) = state.pg_registry.registered_client(client_name) {
return Err(unavailable_registered_client_response(
client_name,
®istered_client,
));
}
Err(postgres_client_not_configured(client_name))
}
pub async fn optional_pool_for_client(
state: &AppState,
client_name: &str,
) -> Result<Option<PgPool>, HttpResponse> {
let trimmed = client_name.trim();
if trimmed.is_empty() {
return Ok(None);
}
if state.pg_registry.registered_client(trimmed).is_some() {
return pool_for_client(state, trimmed).await.map(Some);
}
match catalog_client_has_database_connection(state, trimmed).await {
Ok(true) => pool_for_client(state, trimmed).await.map(Some),
Ok(false) => Ok(None),
Err(err) => Err(service_unavailable(
"Client catalog unavailable",
format!("Failed to resolve catalog-backed Postgres client: {}", err),
)),
}
}
pub async fn client_name_and_pool(
req: &HttpRequest,
state: &AppState,
) -> Result<(String, PgPool), HttpResponse> {
let name: String = required_client_name(req)?;
let pool: PgPool = pool_for_client(state, &name).await?;
Ok((name, pool))
}
pub async fn required_client_pool(
req: &HttpRequest,
state: &AppState,
) -> Result<PgPool, HttpResponse> {
let (_, pool) = client_name_and_pool(req, state).await?;
Ok(pool)
}
pub fn registered_client_for(
state: &AppState,
client_name: &str,
) -> Result<RegisteredClient, HttpResponse> {
state
.pg_registry
.registered_client(client_name)
.ok_or_else(|| {
bad_request(
"Unknown client",
format!("Client '{}' is not registered.", client_name),
)
})
}
pub fn logging_pool(state: &AppState) -> Result<PgPool, HttpResponse> {
let (_, pool) = logging_client_name_and_pool(state)?;
Ok(pool)
}
pub fn logging_client_name_and_pool(state: &AppState) -> Result<(String, PgPool), HttpResponse> {
let Some(client_name) = state.logging_client_name.as_ref() else {
return Err(service_unavailable(
"Logging store unavailable",
"No athena logging client is configured.",
));
};
let pool = state.pg_registry.get_pool(client_name).ok_or_else(|| {
service_unavailable(
"Logging store unavailable",
format!("Logging client '{}' is not connected.", client_name),
)
})?;
Ok((client_name.clone(), pool))
}
pub fn auth_pool(state: &AppState) -> Result<PgPool, HttpResponse> {
let Some(client_name) = state.gateway_auth_client_name.as_ref() else {
return Err(service_unavailable(
"Auth store unavailable",
"No gateway auth client is configured.",
));
};
state.pg_registry.get_pool(client_name).ok_or_else(|| {
service_unavailable(
"Auth store unavailable",
format!("Gateway auth client '{}' is not connected.", client_name),
)
})
}
pub async fn request_auth_pool(
req: &HttpRequest,
state: &AppState,
) -> Result<PgPool, HttpResponse> {
let client_name = required_client_name(req)?;
pool_for_client(state, &client_name).await
}
pub fn try_auth_pool(state: &AppState) -> Option<PgPool> {
let client_name: &String = state.gateway_auth_client_name.as_ref()?;
state.pg_registry.get_pool(client_name)
}
fn unavailable_registered_client_response(
client_name: &str,
registered_client: &RegisteredClient,
) -> HttpResponse {
if !registered_client.is_active {
return bad_request(
format!("Client '{}' is inactive", client_name),
"Postgres client is configured but inactive",
);
}
if registered_client.is_frozen {
return bad_request(
format!("Client '{}' is frozen", client_name),
"Postgres client is configured but frozen",
);
}
service_unavailable(
format!(
"Client '{}' is configured but currently unavailable",
client_name
),
"Postgres client is configured but has no active connection pool",
)
}