athena_rs 3.3.0

Database gateway API
Documentation
use actix_web::http::StatusCode;
use actix_web::http::header;
use actix_web::web::Data;
use actix_web::{App, test};
use athena_rs::AppState;
use athena_rs::api::gateway::insert::{InsertWindowCoordinator, InsertWindowSettings};
use athena_rs::api::health::root;
use athena_rs::api::{athena_docs, athena_openapi_host, athena_wss_openapi_host};
use athena_rs::drivers::postgresql::sqlx_driver::PostgresClientRegistry;
use moka::future::Cache;
use reqwest::Client;
use serde_json::Value;
use std::sync::Arc;
use std::time::{Duration, Instant};

const EXPECTED_OPENAPI_YAML: &str = include_str!("../openapi.yaml");
const EXPECTED_OPENAPI_WSS_YAML: &str = include_str!("../openapi-wss.yaml");

fn test_app_state() -> AppState {
    let cache: Arc<Cache<String, Value>> = Arc::new(
        Cache::builder()
            .time_to_live(Duration::from_secs(60))
            .build(),
    );
    let immortal_cache: Arc<Cache<String, Value>> = Arc::new(Cache::builder().build());
    let client = Client::builder()
        .pool_idle_timeout(Duration::from_secs(90))
        .build()
        .expect("Failed to build HTTP client");
    let jdbc_pool_cache = Arc::new(Cache::builder().max_capacity(64).build());

    let insert_window_coordinator: Arc<InsertWindowCoordinator> =
        InsertWindowCoordinator::new(InsertWindowSettings {
            max_batch: 100,
            max_queued: 10_000,
            deny_tables: Default::default(),
        });

    AppState {
        cache,
        immortal_cache,
        client,
        process_start_time_seconds: 0,
        process_started_at: Instant::now(),
        pg_registry: Arc::new(PostgresClientRegistry::empty()),
        jdbc_pool_cache,
        #[cfg(feature = "deadpool_experimental")]
        deadpool_registry: Arc::new(
            athena_rs::drivers::postgresql::deadpool_registry::DeadpoolPostgresRegistry::empty(),
        ),
        #[cfg(feature = "deadpool_experimental")]
        jdbc_deadpool_cache: Arc::new(Cache::builder().max_capacity(4).build()),
        gateway_force_camel_case_to_snake_case: false,
        gateway_auto_cast_uuid_filter_values_to_text: true,
        gateway_allow_schema_names_prefixed_as_table_name: true,
        pipeline_registry: None,
        logging_client_name: None,
        gateway_auth_client_name: None,
        gateway_api_key_fail_mode: "fail_closed".to_string(),
        gateway_jdbc_allow_private_hosts: true,
        gateway_jdbc_allowed_hosts: vec![],
        gateway_resilience_timeout_secs: 30,
        gateway_resilience_read_max_retries: 1,
        gateway_resilience_initial_backoff_ms: 100,
        gateway_admission_store_backend: "redis".to_string(),
        gateway_admission_store_fail_mode: "fail_closed".to_string(),
        prometheus_metrics_enabled: false,
        metrics_state: Arc::new(athena_rs::api::metrics::MetricsState::new()),
        gateway_insert_execution_window_ms: 0,
        gateway_insert_window_max_batch: 100,
        gateway_insert_window_max_queued: 10_000,
        gateway_insert_merge_deny_tables: Default::default(),
        insert_window_coordinator: insert_window_coordinator.clone(),
    }
}

#[actix_web::test]
async fn test_openapi_yaml_endpoint_serves_embedded_spec() {
    let app = test::init_service(App::new().service(athena_openapi_host)).await;
    let req = test::TestRequest::get().uri("/openapi.yaml").to_request();
    let resp = test::call_service(&app, req).await;

    assert!(resp.status().is_success());

    let content_type = resp
        .headers()
        .get(header::CONTENT_TYPE)
        .and_then(|value| value.to_str().ok());
    assert_eq!(content_type, Some("application/yaml; charset=utf-8"));

    let body = test::read_body(resp).await;
    assert_eq!(std::str::from_utf8(&body).unwrap(), EXPECTED_OPENAPI_YAML);
}

#[actix_web::test]
async fn test_openapi_yaml_endpoint_rejects_post_requests() {
    let app = test::init_service(App::new().service(athena_openapi_host)).await;
    let req = test::TestRequest::post().uri("/openapi.yaml").to_request();
    let resp = test::call_service(&app, req).await;

    assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}

#[actix_web::test]
async fn test_openapi_yaml_contains_public_docs_paths() {
    let app = test::init_service(App::new().service(athena_openapi_host)).await;
    let req = test::TestRequest::get().uri("/openapi.yaml").to_request();
    let resp = test::call_service(&app, req).await;
    let body = test::read_body(resp).await;
    let body = std::str::from_utf8(&body).unwrap();

    assert!(body.contains("  /openapi.yaml:"));
    assert!(body.contains("  /docs:"));
}

#[actix_web::test]
async fn test_openapi_wss_yaml_endpoint_serves_embedded_spec() {
    let app = test::init_service(App::new().service(athena_wss_openapi_host)).await;
    let req = test::TestRequest::get()
        .uri("/openapi-wss.yaml")
        .to_request();
    let resp = test::call_service(&app, req).await;

    assert!(resp.status().is_success());

    let content_type = resp
        .headers()
        .get(header::CONTENT_TYPE)
        .and_then(|value| value.to_str().ok());
    assert_eq!(content_type, Some("application/yaml; charset=utf-8"));

    let body = test::read_body(resp).await;
    assert_eq!(
        std::str::from_utf8(&body).unwrap(),
        EXPECTED_OPENAPI_WSS_YAML
    );
}

#[actix_web::test]
async fn test_openapi_wss_yaml_endpoint_rejects_post_requests() {
    let app = test::init_service(App::new().service(athena_wss_openapi_host)).await;
    let req = test::TestRequest::post()
        .uri("/openapi-wss.yaml")
        .to_request();
    let resp = test::call_service(&app, req).await;

    assert_eq!(resp.status(), StatusCode::NOT_FOUND);
}

#[actix_web::test]
async fn test_docs_redirect_is_permanent_and_points_to_hosted_docs() {
    let app = test::init_service(App::new().service(athena_docs)).await;
    let req = test::TestRequest::get().uri("/docs").to_request();
    let resp = test::call_service(&app, req).await;

    assert_eq!(resp.status(), StatusCode::PERMANENT_REDIRECT);

    let location = resp
        .headers()
        .get(header::LOCATION)
        .and_then(|value| value.to_str().ok());
    assert_eq!(location, Some("https://xylex.group/docs/athena"));
}

#[actix_web::test]
async fn test_root_lists_openapi_and_docs_routes() {
    let app_state = test_app_state();
    let coord = app_state.insert_window_coordinator.clone();
    let data = Data::new(app_state);
    coord.bind_app_state(data.clone());
    let app = test::init_service(
        App::new()
            .app_data(data)
            .service(root)
            .service(athena_openapi_host)
            .service(athena_wss_openapi_host)
            .service(athena_docs),
    )
    .await;
    let req = test::TestRequest::get().uri("/").to_request();
    let resp = test::call_service(&app, req).await;

    assert!(resp.status().is_success());

    let body: Value = test::read_body_json(resp).await;
    assert_eq!(
        body.get("message").and_then(Value::as_str),
        Some("athena is online")
    );
    assert_eq!(
        body.get("athena_deadpool").and_then(Value::as_str),
        Some("offline")
    );

    let routes = body
        .get("routes")
        .and_then(Value::as_array)
        .expect("routes array should be present");
    assert!(routes.iter().any(|route| {
        route.get("path").and_then(Value::as_str) == Some("/health/cluster")
            && route.get("summary").and_then(Value::as_str)
                == Some("Cluster mirror health and version checks")
    }));
    assert!(routes.iter().any(|route| {
        route.get("path").and_then(Value::as_str) == Some("/management/capabilities")
            && route.get("summary").and_then(Value::as_str)
                == Some("List management API capabilities for a client")
    }));
    assert!(routes.iter().any(|route| {
        route.get("path").and_then(Value::as_str) == Some("/openapi.yaml")
            && route
                .get("methods")
                .and_then(Value::as_array)
                .is_some_and(|methods| methods.iter().any(|method| method.as_str() == Some("GET")))
    }));
    assert!(routes.iter().any(|route| {
        route.get("path").and_then(Value::as_str) == Some("/openapi-wss.yaml")
            && route
                .get("methods")
                .and_then(Value::as_array)
                .is_some_and(|methods| methods.iter().any(|method| method.as_str() == Some("GET")))
    }));
    assert!(routes.iter().any(|route| {
        route.get("path").and_then(Value::as_str) == Some("/docs")
            && route.get("summary").and_then(Value::as_str) == Some("Documentation redirect")
    }));
}