athena_rs 3.3.0

Database gateway API
Documentation
use actix_web::dev::ServiceResponse;
use actix_web::web::Data;
use actix_web::{App, http::StatusCode, test};
use athena_rs::AppState;
use athena_rs::api::admin;
use athena_rs::api::auth::{api_key_query_param_used, extract_api_key};
use athena_rs::api::gateway::fetch::fetch_data_route;
use athena_rs::api::gateway::insert::{InsertWindowCoordinator, InsertWindowSettings};
use athena_rs::drivers::postgresql::sqlx_driver::PostgresClientRegistry;
use moka::future::Cache;
use reqwest::Client;
use serde_json::{Value, json};
use std::sync::Arc;
use std::time::Instant;

fn build_test_state(auth_client_name: Option<&str>) -> Data<AppState> {
    let cache: Arc<Cache<String, Value>> = Arc::new(Cache::builder().build());
    let immortal_cache: Arc<Cache<String, Value>> = Arc::new(Cache::builder().build());
    let client: Client = Client::builder()
        .build()
        .expect("Failed to build HTTP client");
    let pg_registry: Arc<PostgresClientRegistry> = Arc::new(PostgresClientRegistry::empty());
    let jdbc_pool_cache: Arc<Cache<String, sqlx::Pool<sqlx::Postgres>>> =
        Arc::new(Cache::builder().max_capacity(64).build());

    let insert_window_coordinator: Arc<InsertWindowCoordinator> =
        InsertWindowCoordinator::new(InsertWindowSettings {
            max_batch: 100,
            max_queued: 10_000,
            deny_tables: Default::default(),
        });
    let data: Data<AppState> = Data::new(AppState {
        cache,
        immortal_cache,
        client,
        process_start_time_seconds: 0,
        process_started_at: Instant::now(),
        pg_registry,
        jdbc_pool_cache,
        #[cfg(feature = "deadpool_experimental")]
        deadpool_registry: Arc::new(
            athena_rs::drivers::postgresql::deadpool_registry::DeadpoolPostgresRegistry::empty(),
        ),
        #[cfg(feature = "deadpool_experimental")]
        jdbc_deadpool_cache: Arc::new(Cache::builder().max_capacity(4).build()),
        gateway_force_camel_case_to_snake_case: false,
        gateway_auto_cast_uuid_filter_values_to_text: true,
        gateway_allow_schema_names_prefixed_as_table_name: true,
        pipeline_registry: None,
        logging_client_name: None,
        gateway_auth_client_name: auth_client_name.map(str::to_string),
        gateway_api_key_fail_mode: "fail_closed".to_string(),
        gateway_jdbc_allow_private_hosts: true,
        gateway_jdbc_allowed_hosts: vec![],
        gateway_resilience_timeout_secs: 30,
        gateway_resilience_read_max_retries: 1,
        gateway_resilience_initial_backoff_ms: 100,
        gateway_admission_store_backend: "redis".to_string(),
        gateway_admission_store_fail_mode: "fail_closed".to_string(),
        prometheus_metrics_enabled: false,
        metrics_state: Arc::new(athena_rs::api::metrics::MetricsState::new()),
        gateway_insert_execution_window_ms: 0,
        gateway_insert_window_max_batch: 100,
        gateway_insert_window_max_queued: 10_000,
        gateway_insert_merge_deny_tables: Default::default(),
        insert_window_coordinator: insert_window_coordinator.clone(),
    });
    insert_window_coordinator.bind_app_state(data.clone());
    data
}

#[actix_web::test]
async fn admin_routes_require_static_admin_key() {
    let state = build_test_state(None);
    let app = test::init_service(
        App::new()
            .app_data(state.clone())
            .configure(admin::services),
    )
    .await;

    let req = test::TestRequest::get().uri("/admin/api-keys").to_request();
    let resp: ServiceResponse = test::call_service(&app, req).await;

    assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
}

#[actix_web::test]
async fn admin_routes_fail_when_auth_store_is_not_configured() {
    unsafe {
        std::env::set_var("ATHENA_KEY_12", "test-admin-key");
    }
    let state = build_test_state(None);
    let app = test::init_service(
        App::new()
            .app_data(state.clone())
            .configure(admin::services),
    )
    .await;

    let req = test::TestRequest::get()
        .uri("/admin/api-keys")
        .insert_header(("X-Athena-Key", "test-admin-key"))
        .to_request();
    let resp: ServiceResponse = test::call_service(&app, req).await;

    assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
}

#[actix_web::test]
async fn gateway_fetch_fails_open_when_auth_store_is_configured_but_unavailable() {
    let state = build_test_state(Some("auth_store"));
    let app =
        test::init_service(App::new().app_data(state.clone()).service(fetch_data_route)).await;

    let req = test::TestRequest::post()
        .uri("/gateway/data")
        .insert_header(("X-Athena-Client", "reporting"))
        .set_json(json!({
            "table_name": "users"
        }))
        .to_request();
    let resp: ServiceResponse = test::call_service(&app, req).await;

    assert_eq!(resp.status(), StatusCode::BAD_REQUEST);
    let body: Value = test::read_body_json(resp).await;
    assert_eq!(body["status"], "error");
}

#[actix_web::test]
async fn extract_api_key_reads_x_athena_key_header() {
    let req = test::TestRequest::with_uri("/gateway/data")
        .insert_header(("X-Athena-Key", "header-key"))
        .to_http_request();

    assert_eq!(extract_api_key(&req).as_deref(), Some("header-key"));
}

#[actix_web::test]
async fn extract_api_key_rejects_query_param_sources() {
    let req = test::TestRequest::with_uri("/gateway/data?apikey=legacy-key").to_http_request();

    assert_eq!(extract_api_key(&req).as_deref(), None);
    assert!(!api_key_query_param_used(&req));
}

#[actix_web::test]
async fn api_key_query_param_used_is_false_for_x_athena_key_header() {
    let req = test::TestRequest::default()
        .insert_header(("x-athena-key", "header-key"))
        .to_http_request();

    assert!(!api_key_query_param_used(&req));
}

#[actix_web::test]
async fn admin_admission_events_route_requires_static_admin_key() {
    let state = build_test_state(None);
    let app = test::init_service(
        App::new()
            .app_data(state.clone())
            .configure(admin::services),
    )
    .await;

    let req = test::TestRequest::get()
        .uri("/admin/admission-events?decision=deferred&client=reporting")
        .to_request();
    let resp: ServiceResponse = test::call_service(&app, req).await;

    assert_eq!(resp.status(), StatusCode::UNAUTHORIZED);
}

#[actix_web::test]
async fn admin_admission_events_route_fails_when_logging_store_is_unavailable() {
    unsafe {
        std::env::set_var("ATHENA_KEY_12", "test-admin-key");
    }
    let state = build_test_state(None);
    let app = test::init_service(
        App::new()
            .app_data(state.clone())
            .configure(admin::services),
    )
    .await;

    let req = test::TestRequest::get()
        .uri("/admin/admission-events?decision=rejected")
        .insert_header(("X-Athena-Key", "test-admin-key"))
        .to_request();
    let resp: ServiceResponse = test::call_service(&app, req).await;

    assert_eq!(resp.status(), StatusCode::SERVICE_UNAVAILABLE);
}