athena_rs 3.26.2

Hyper performant polyglot Database driver
Documentation
use actix_web::{
    HttpRequest, HttpResponse, Responder, get,
    web::{self, Data, Path, Query},
};
use serde::Deserialize;
use serde_json::{Value, json};
use sha256::digest;

use crate::AppState;
use crate::api::auth::authorize_static_admin_key;
use crate::api::cache::check::{
    CacheLookupOutcome, check_cache_control_and_get_response_v2_with_outcome,
};
use crate::api::cache::hydrate::hydrate_cache_and_return_json_with_write_metric;
use crate::api::headers::response_headers::set_cache_headers;
use crate::api::response::{api_success, not_found, service_unavailable};
use crate::data::clients::get_athena_client_by_name;
use crate::data::table_row_estimates::{
    TableRowEstimateQuery, build_table_row_estimate_snapshot, collect_table_row_estimates,
};

const TABLE_ROW_ESTIMATE_CACHE_LOOKUP_METRIC: &str = "admin_table_row_estimates_cache_lookup";
const TABLE_ROW_ESTIMATE_CACHE_WRITE_METRIC: &str = "admin_table_row_estimates_cache_write";
const DEFAULT_TOP_TABLE_LIMIT: i64 = 12;
const MAX_TOP_TABLE_LIMIT: i64 = 100;

#[derive(Debug, Deserialize)]
struct TableRowEstimateParams {
    #[serde(default)]
    schema: Option<String>,
    #[serde(default)]
    limit: Option<i64>,
    #[serde(default)]
    include_system_schemas: bool,
}

fn cache_source_from_outcome(outcome: CacheLookupOutcome) -> &'static str {
    match outcome {
        CacheLookupOutcome::HitLocalRaw | CacheLookupOutcome::HitLocal => "local",
        CacheLookupOutcome::HitRedis => "redis",
        CacheLookupOutcome::BypassNoCacheHeader => "bypass",
        CacheLookupOutcome::MissAllTiers
        | CacheLookupOutcome::MissAfterRedisGetError
        | CacheLookupOutcome::MissAfterRedisGetTimeout => "database",
    }
}

fn apply_cached_headers(
    mut resp: HttpResponse,
    outcome: CacheLookupOutcome,
    cache_key: &str,
) -> HttpResponse {
    let cache_source: &str = cache_source_from_outcome(outcome);
    set_cache_headers(
        resp.headers_mut(),
        true,
        Some(cache_key),
        Some(outcome.as_str()),
        Some(cache_source),
    );
    resp
}

fn apply_miss_headers(
    mut resp: HttpResponse,
    outcome: CacheLookupOutcome,
    cache_key: &str,
) -> HttpResponse {
    set_cache_headers(
        resp.headers_mut(),
        false,
        Some(cache_key),
        Some(outcome.as_str()),
        Some("database"),
    );
    resp
}

fn normalize_limit(limit: Option<i64>) -> usize {
    let limit: i64 = limit.unwrap_or(DEFAULT_TOP_TABLE_LIMIT);
    let clamped: i64 = limit.clamp(1, MAX_TOP_TABLE_LIMIT);
    usize::try_from(clamped).unwrap_or(DEFAULT_TOP_TABLE_LIMIT as usize)
}

fn build_cache_key(
    client_name: &str,
    schema_name: Option<&str>,
    limit: usize,
    include_system_schemas: bool,
) -> String {
    let input: Value = json!({
        "route": "admin_table_row_estimates",
        "client_name": client_name,
        "schema": schema_name,
        "limit": limit,
        "include_system_schemas": include_system_schemas,
    });
    format!(
        "admin_table_row_estimates:{}",
        digest(serde_json::to_string(&input).unwrap_or_default())
    )
}

#[get("/admin/clients/{client_name}/table-row-estimates")]
async fn admin_get_table_row_estimates(
    req: HttpRequest,
    path: Path<String>,
    query: Query<TableRowEstimateParams>,
    app_state: Data<AppState>,
) -> impl Responder {
    if let Err(resp) = authorize_static_admin_key(&req) {
        return resp;
    }

    let catalog_pool = match super::client_catalog_pool(app_state.get_ref()) {
        Ok(pool) => pool,
        Err(resp) => return resp,
    };

    let client_name = match super::normalize_client_name(&path.into_inner()) {
        Ok(value) => value,
        Err(resp) => return resp,
    };

    let client_record = match get_athena_client_by_name(&catalog_pool, &client_name).await {
        Ok(Some(record)) => record,
        Ok(None) => {
            return not_found(
                "Athena client not found",
                format!("No client exists for '{}'.", client_name),
            );
        }
        Err(err) => {
            return super::database_error_response("Failed to load Athena client", err);
        }
    };

    let schema_name = query
        .schema
        .as_deref()
        .map(str::trim)
        .filter(|value| !value.is_empty())
        .map(str::to_string);

    let limit = normalize_limit(query.limit);
    let include_system_schemas = query.include_system_schemas;
    let cache_key = build_cache_key(
        &client_name,
        schema_name.as_deref(),
        limit,
        include_system_schemas,
    );

    let (cache_result, cache_outcome): (Option<HttpResponse>, CacheLookupOutcome) =
        check_cache_control_and_get_response_v2_with_outcome(
            &req,
            app_state.clone(),
            &cache_key,
            TABLE_ROW_ESTIMATE_CACHE_LOOKUP_METRIC,
        )
        .await;

    if let Some(cached_response) = cache_result {
        return apply_cached_headers(cached_response, cache_outcome, &cache_key);
    }

    let Some(target_pool) = app_state.pg_registry.get_pool(&client_name) else {
        return service_unavailable(
            "Postgres client not connected",
            format!(
                "Client '{}' exists in the catalog but does not currently have an active Postgres pool.",
                client_record.client_name
            ),
        );
    };

    let estimate_query = TableRowEstimateQuery {
        schema_name: schema_name.clone(),
        include_system_schemas,
    };

    match collect_table_row_estimates(&target_pool, &estimate_query).await {
        Ok(rows) => {
            let snapshot = build_table_row_estimate_snapshot(&rows, limit);
            let data = json!({
                "client_name": client_name,
                "schema": schema_name,
                "limit": limit,
                "include_system_schemas": include_system_schemas,
                "snapshot": snapshot,
            });
            let envelope = json!({
                "status": "success",
                "message": "Loaded table row estimates",
                "data": data.clone(),
            });

            hydrate_cache_and_return_json_with_write_metric(
                app_state.clone(),
                cache_key.clone(),
                vec![envelope],
                TABLE_ROW_ESTIMATE_CACHE_WRITE_METRIC,
            )
            .await;

            let response = api_success("Loaded table row estimates", data);
            apply_miss_headers(response, cache_outcome, &cache_key)
        }
        Err(err) => super::database_error_response("Failed to load table row estimates", err),
    }
}

pub(super) fn services(cfg: &mut web::ServiceConfig) {
    cfg.service(admin_get_table_row_estimates);
}