use actix_web::{
HttpRequest, HttpResponse, Responder, get,
web::{self, Data, Path, Query},
};
use serde::Deserialize;
use serde_json::{Value, json};
use sha256::digest;
use crate::AppState;
use crate::api::auth::authorize_static_admin_key;
use crate::api::cache::check::{
CacheLookupOutcome, check_cache_control_and_get_response_v2_with_outcome,
};
use crate::api::cache::hydrate::hydrate_cache_and_return_json_with_write_metric;
use crate::api::headers::response_headers::set_cache_headers;
use crate::api::response::{api_success, not_found, service_unavailable};
use crate::data::clients::get_athena_client_by_name;
use crate::data::table_row_estimates::{
TableRowEstimateQuery, build_table_row_estimate_snapshot, collect_table_row_estimates,
};
const TABLE_ROW_ESTIMATE_CACHE_LOOKUP_METRIC: &str = "admin_table_row_estimates_cache_lookup";
const TABLE_ROW_ESTIMATE_CACHE_WRITE_METRIC: &str = "admin_table_row_estimates_cache_write";
const DEFAULT_TOP_TABLE_LIMIT: i64 = 12;
const MAX_TOP_TABLE_LIMIT: i64 = 100;
#[derive(Debug, Deserialize)]
struct TableRowEstimateParams {
#[serde(default)]
schema: Option<String>,
#[serde(default)]
limit: Option<i64>,
#[serde(default)]
include_system_schemas: bool,
}
fn cache_source_from_outcome(outcome: CacheLookupOutcome) -> &'static str {
match outcome {
CacheLookupOutcome::HitLocalRaw | CacheLookupOutcome::HitLocal => "local",
CacheLookupOutcome::HitRedis => "redis",
CacheLookupOutcome::BypassNoCacheHeader => "bypass",
CacheLookupOutcome::MissAllTiers
| CacheLookupOutcome::MissAfterRedisGetError
| CacheLookupOutcome::MissAfterRedisGetTimeout => "database",
}
}
fn apply_cached_headers(
mut resp: HttpResponse,
outcome: CacheLookupOutcome,
cache_key: &str,
) -> HttpResponse {
let cache_source: &str = cache_source_from_outcome(outcome);
set_cache_headers(
resp.headers_mut(),
true,
Some(cache_key),
Some(outcome.as_str()),
Some(cache_source),
);
resp
}
fn apply_miss_headers(
mut resp: HttpResponse,
outcome: CacheLookupOutcome,
cache_key: &str,
) -> HttpResponse {
set_cache_headers(
resp.headers_mut(),
false,
Some(cache_key),
Some(outcome.as_str()),
Some("database"),
);
resp
}
fn normalize_limit(limit: Option<i64>) -> usize {
let limit: i64 = limit.unwrap_or(DEFAULT_TOP_TABLE_LIMIT);
let clamped: i64 = limit.clamp(1, MAX_TOP_TABLE_LIMIT);
usize::try_from(clamped).unwrap_or(DEFAULT_TOP_TABLE_LIMIT as usize)
}
fn build_cache_key(
client_name: &str,
schema_name: Option<&str>,
limit: usize,
include_system_schemas: bool,
) -> String {
let input: Value = json!({
"route": "admin_table_row_estimates",
"client_name": client_name,
"schema": schema_name,
"limit": limit,
"include_system_schemas": include_system_schemas,
});
format!(
"admin_table_row_estimates:{}",
digest(serde_json::to_string(&input).unwrap_or_default())
)
}
#[get("/admin/clients/{client_name}/table-row-estimates")]
async fn admin_get_table_row_estimates(
req: HttpRequest,
path: Path<String>,
query: Query<TableRowEstimateParams>,
app_state: Data<AppState>,
) -> impl Responder {
if let Err(resp) = authorize_static_admin_key(&req) {
return resp;
}
let catalog_pool = match super::client_catalog_pool(app_state.get_ref()) {
Ok(pool) => pool,
Err(resp) => return resp,
};
let client_name = match super::normalize_client_name(&path.into_inner()) {
Ok(value) => value,
Err(resp) => return resp,
};
let client_record = match get_athena_client_by_name(&catalog_pool, &client_name).await {
Ok(Some(record)) => record,
Ok(None) => {
return not_found(
"Athena client not found",
format!("No client exists for '{}'.", client_name),
);
}
Err(err) => {
return super::database_error_response("Failed to load Athena client", err);
}
};
let schema_name = query
.schema
.as_deref()
.map(str::trim)
.filter(|value| !value.is_empty())
.map(str::to_string);
let limit = normalize_limit(query.limit);
let include_system_schemas = query.include_system_schemas;
let cache_key = build_cache_key(
&client_name,
schema_name.as_deref(),
limit,
include_system_schemas,
);
let (cache_result, cache_outcome): (Option<HttpResponse>, CacheLookupOutcome) =
check_cache_control_and_get_response_v2_with_outcome(
&req,
app_state.clone(),
&cache_key,
TABLE_ROW_ESTIMATE_CACHE_LOOKUP_METRIC,
)
.await;
if let Some(cached_response) = cache_result {
return apply_cached_headers(cached_response, cache_outcome, &cache_key);
}
let Some(target_pool) = app_state.pg_registry.get_pool(&client_name) else {
return service_unavailable(
"Postgres client not connected",
format!(
"Client '{}' exists in the catalog but does not currently have an active Postgres pool.",
client_record.client_name
),
);
};
let estimate_query = TableRowEstimateQuery {
schema_name: schema_name.clone(),
include_system_schemas,
};
match collect_table_row_estimates(&target_pool, &estimate_query).await {
Ok(rows) => {
let snapshot = build_table_row_estimate_snapshot(&rows, limit);
let data = json!({
"client_name": client_name,
"schema": schema_name,
"limit": limit,
"include_system_schemas": include_system_schemas,
"snapshot": snapshot,
});
let envelope = json!({
"status": "success",
"message": "Loaded table row estimates",
"data": data.clone(),
});
hydrate_cache_and_return_json_with_write_metric(
app_state.clone(),
cache_key.clone(),
vec![envelope],
TABLE_ROW_ESTIMATE_CACHE_WRITE_METRIC,
)
.await;
let response = api_success("Loaded table row estimates", data);
apply_miss_headers(response, cache_outcome, &cache_key)
}
Err(err) => super::database_error_response("Failed to load table row estimates", err),
}
}
pub(super) fn services(cfg: &mut web::ServiceConfig) {
cfg.service(admin_get_table_row_estimates);
}