athena_rs 3.3.0

Database gateway API
Documentation
use actix_web::body::to_bytes;
use actix_web::dev::ServiceResponse;
use actix_web::http::header::CACHE_CONTROL;
use actix_web::test::TestRequest;
use actix_web::web::Data;
use actix_web::{App, HttpRequest, HttpResponse, test};

use axum::body::Bytes;

use moka::future::Cache;
use reqwest::Client;
use serde_json::{Value, json};
use sqlx::{Pool, Postgres};
use std::sync::Arc;
use std::time::{Duration, Instant};

use athena_rs::AppState;
use athena_rs::api::cache::{
    cache_control::is_cache_control_no_cache,
    check::{
        check_cache_control_and_get_response, check_cache_control_and_get_response_v2,
        get_cached_response,
    },
    hydrate::hydrate_cache_and_return_json,
    rehydrate::check_rehydrate,
};
use athena_rs::api::gateway::insert::{InsertWindowCoordinator, InsertWindowSettings};

use athena_rs::api::metrics::{MetricsState, prometheus_metrics};
use athena_rs::drivers::postgresql::deadpool_registry::DeadpoolPostgresRegistry;
use athena_rs::drivers::postgresql::sqlx_driver::PostgresClientRegistry;

fn build_test_app_state() -> Data<AppState> {
    let cache: Cache<String, Value> = Cache::builder().build();
    let immortal: Cache<String, Value> = Cache::builder().build();
    let jdbc_pool_cache: Arc<Cache<String, Pool<Postgres>>> =
        Arc::new(Cache::builder().max_capacity(64).build());

    let insert_window_coordinator: Arc<InsertWindowCoordinator> =
        InsertWindowCoordinator::new(InsertWindowSettings {
            max_batch: 100,
            max_queued: 10_000,
            deny_tables: Default::default(),
        });
    let data: Data<AppState> = Data::new(AppState {
        cache: Arc::new(cache),
        immortal_cache: Arc::new(immortal),
        client: Client::new(),
        process_start_time_seconds: 0 as i64,
        process_started_at: Instant::now(),
        pg_registry: Arc::new(PostgresClientRegistry::empty()),
        jdbc_pool_cache,
        gateway_force_camel_case_to_snake_case: false,
        gateway_auto_cast_uuid_filter_values_to_text: true,
        gateway_allow_schema_names_prefixed_as_table_name: true,
        pipeline_registry: None,
        logging_client_name: None,
        gateway_auth_client_name: None,
        gateway_api_key_fail_mode: "fail_closed".to_string(),
        gateway_jdbc_allow_private_hosts: true,
        gateway_jdbc_allowed_hosts: vec![],
        gateway_resilience_timeout_secs: 30,
        gateway_resilience_read_max_retries: 1,
        gateway_resilience_initial_backoff_ms: 100,
        gateway_admission_store_backend: "redis".to_string(),
        gateway_admission_store_fail_mode: "fail_closed".to_string(),
        prometheus_metrics_enabled: false,
        metrics_state: Arc::new(MetricsState::new()),
        #[cfg(feature = "deadpool_experimental")]
        deadpool_registry: Arc::new(DeadpoolPostgresRegistry::empty()),
        #[cfg(feature = "deadpool_experimental")]
        jdbc_deadpool_cache: Arc::new(Cache::builder().max_capacity(4).build()),
        gateway_insert_execution_window_ms: 0,
        gateway_insert_window_max_batch: 100,
        gateway_insert_window_max_queued: 10_000,
        gateway_insert_merge_deny_tables: Default::default(),
        insert_window_coordinator: insert_window_coordinator.clone(),
    });
    insert_window_coordinator.bind_app_state(data.clone());
    data
}

#[actix_web::test]
async fn cache_control_no_cache_true_when_header_present() {
    let req: HttpRequest = TestRequest::default()
        .insert_header(("Cache-Control", "no-cache"))
        .to_http_request();
    assert!(is_cache_control_no_cache(&req));
}

#[actix_web::test]
async fn cache_control_no_cache_false_when_header_missing() {
    let req: HttpRequest = TestRequest::default().to_http_request();
    assert!(!is_cache_control_no_cache(&req));
}

#[actix_web::test]
async fn cache_control_no_cache_false_other_directives() {
    let req: HttpRequest = TestRequest::default()
        .insert_header(("Cache-Control", "max-age=60"))
        .to_http_request();
    assert!(!is_cache_control_no_cache(&req));
}

#[actix_web::test]
async fn rehydrate_returns_true_when_present() {
    let req: HttpRequest = TestRequest::default()
        .uri("/path?rehydrate=true")
        .to_http_request();
    assert!(check_rehydrate(&req).await);
}

#[actix_web::test]
async fn rehydrate_returns_false_when_missing() {
    let req: HttpRequest = TestRequest::default()
        .uri("/path?foo=bar")
        .to_http_request();
    assert!(!check_rehydrate(&req).await);
}

#[tokio::test]
async fn hydrate_inserts_and_returns() {
    let data: Data<AppState> = build_test_app_state();
    let key: String = "test_key".to_string();
    let payload: Vec<Value> = vec![json!({"a": 1}), json!({"b": 2})];

    let returned: Vec<Value> =
        hydrate_cache_and_return_json(data.clone(), key.clone(), payload.clone()).await;
    assert_eq!(returned, payload);
    let cached: Option<Value> = data.cache.get(&key).await;
    assert!(cached.is_some());
    assert_eq!(cached.unwrap(), Value::Array(payload));
}

#[tokio::test]
async fn hydrate_overwrites_existing_key() {
    let data: Data<AppState> = build_test_app_state();
    let key: String = "key2".to_string();
    hydrate_cache_and_return_json(data.clone(), key.clone(), vec![json!({"v": 1})]).await;
    hydrate_cache_and_return_json(data.clone(), key.clone(), vec![json!({"v": 2})]).await;
    let cached: Value = data.cache.get(&key).await.unwrap();
    assert_eq!(cached, Value::Array(vec![json!({"v": 2})]));
}

#[tokio::test]
async fn hydrate_writes_raw_fast_path_payload_for_single_item_arrays() {
    let data: Data<AppState> = build_test_app_state();
    let key: String = "raw_key".to_string();
    let payload: Vec<Value> = vec![json!({"v": 3})];

    hydrate_cache_and_return_json(data.clone(), key.clone(), payload).await;

    let raw_key: String = format!("{key}:__raw_json");
    let raw: Option<Value> = data.cache.get(&raw_key).await;
    assert!(raw.is_some());
    assert_eq!(raw.unwrap(), Value::String("{\"v\":3}".to_string()));
}

#[tokio::test]
async fn check_get_cached_response_returns_value() {
    let data: Data<AppState> = build_test_app_state();
    let key: String = "k".to_string();
    data.cache.insert(key.clone(), json!({"x": 1})).await;
    let maybe: Option<actix_web::HttpResponse> = get_cached_response(data, &key).await;
    assert!(maybe.is_some());
}

#[tokio::test]
async fn check_cache_control_no_cache_skips() {
    let data: Data<AppState> = build_test_app_state();
    let key: String = "k2".to_string();
    data.cache.insert(key.clone(), json!({"y": 2})).await;

    let req = TestRequest::default()
        .insert_header((CACHE_CONTROL, "no-cache"))
        .to_http_request();

    let got: Option<HttpResponse> = check_cache_control_and_get_response(&req, data, &key).await;
    assert!(got.is_none());
}

#[tokio::test]
async fn check_v2_flattens_single_item_array() {
    let data: Data<AppState> = build_test_app_state();
    let key: String = "k3".to_string();
    data.cache.insert(key.clone(), json!([{"z": 3}])).await;

    let req: HttpRequest = TestRequest::default().to_http_request();
    let resp: Option<HttpResponse> =
        check_cache_control_and_get_response_v2(&req, data, &key).await;
    assert!(resp.is_some());
}

#[tokio::test]
async fn check_v2_cache_miss_returns_none() {
    let data: Data<AppState> = build_test_app_state();
    let key: String = "missing".to_string();

    let req: HttpRequest = TestRequest::default().to_http_request();
    let resp: Option<HttpResponse> =
        check_cache_control_and_get_response_v2(&req, data, &key).await;
    assert!(resp.is_none());
}

#[tokio::test]
async fn check_v2_prefers_raw_fast_path_payload_when_available() {
    let data: Data<AppState> = build_test_app_state();
    let key: String = "raw_hit".to_string();
    let raw_key: String = format!("{key}:__raw_json");
    data.cache
        .insert(raw_key, Value::String("{\"fast\":true}".to_string()))
        .await;

    let req: HttpRequest = TestRequest::default().to_http_request();
    let resp: Option<HttpResponse> =
        check_cache_control_and_get_response_v2(&req, data, &key).await;
    assert!(resp.is_some());

    let body: Bytes = to_bytes(resp.unwrap().into_body()).await.unwrap();
    let parsed: Value = serde_json::from_slice(&body).unwrap();
    assert_eq!(parsed, json!({"fast": true}));
}

#[tokio::test]
#[ignore = "benchmark-style latency test; run with --release on representative hardware"]
async fn cache_hit_p95_for_large_payload_is_under_50ms_in_release_mode() {
    let data: Data<AppState> = build_test_app_state();
    let key: String = "latency_large_payload".to_string();

    let large_payload: Vec<Value> = (0..2000)
        .map(|idx| {
            json!({
                "id": idx,
                "name": format!("item-{idx}"),
                "active": idx % 2 == 0,
                "score": idx * 3,
                "meta": { "source": "bench", "group": idx % 7 }
            })
        })
        .collect();

    let _ =
        hydrate_cache_and_return_json(data.clone(), key.clone(), vec![json!(large_payload)]).await;

    let req = TestRequest::default().to_http_request();
    let mut samples: Vec<Duration> = Vec::with_capacity(200);

    for _ in 0..200 {
        let started: Instant = Instant::now();
        let resp: Option<HttpResponse> =
            check_cache_control_and_get_response_v2(&req, data.clone(), &key).await;
        assert!(resp.is_some());
        samples.push(started.elapsed());
    }

    samples.sort_unstable();
    let p95_index: usize = ((samples.len() as f64 * 0.95).ceil() as usize)
        .saturating_sub(1)
        .min(samples.len().saturating_sub(1));
    let p95 = samples[p95_index];

    assert!(
        p95.as_millis() < 50,
        "expected p95 cache hit latency < 50ms, got {}ms",
        p95.as_millis()
    );
}

#[tokio::test]
async fn check_strips_nulls_when_header_set() {
    let data: Data<AppState> = build_test_app_state();
    let key: String = "strip".to_string();
    data.cache
        .insert(key.clone(), json!({"data": [{"a": 1, "b": null}]}))
        .await;
    let req = TestRequest::default()
        .insert_header(("X-Strip-Nulls", "true"))
        .to_http_request();
    let resp = check_cache_control_and_get_response(&req, data, &key).await;
    assert!(resp.is_some());
}

#[tokio::test]
async fn cache_lookup_outcomes_are_exported_in_prometheus_metrics() {
    let data: Data<AppState> = build_test_app_state();
    let req: HttpRequest = TestRequest::default().to_http_request();

    let raw_hit_key: String = "metrics_raw_hit".to_string();
    data.cache
        .insert(
            format!("{raw_hit_key}:__raw_json"),
            Value::String("{\"from\":\"raw\"}".to_string()),
        )
        .await;

    let local_hit_key: String = "metrics_local_hit".to_string();
    data.cache
        .insert(local_hit_key.clone(), json!([{"from": "local"}]))
        .await;

    let miss_key: String = "metrics_miss".to_string();

    let _ = check_cache_control_and_get_response_v2(&req, data.clone(), &raw_hit_key).await;
    let _ = check_cache_control_and_get_response_v2(&req, data.clone(), &local_hit_key).await;
    let _ = check_cache_control_and_get_response_v2(&req, data.clone(), &miss_key).await;

    let app = test::init_service(
        App::new()
            .app_data(data.clone())
            .service(prometheus_metrics),
    )
    .await;
    let metrics_req = TestRequest::get().uri("/metrics").to_request();
    let metrics_response: actix_web::dev::ServiceResponse =
        test::call_service(&app, metrics_req).await;
    let metrics_body: Bytes = to_bytes(metrics_response.into_body()).await.unwrap();
    let metrics_text: String = String::from_utf8(metrics_body.to_vec()).unwrap();

    assert!(
        metrics_text.contains(
            "athena_management_mutations_total{operation=\"gateway_fetch_cache_lookup\",status=\"hit_local_raw\"}"
        ),
        "missing hit_local_raw management metric label"
    );

    assert!(
        metrics_text.contains(
            "athena_management_mutations_total{operation=\"gateway_fetch_cache_lookup\",status=\"hit_local\"}"
        ),
        "missing hit_local management metric label"
    );

    assert!(
        metrics_text.contains(
            "athena_management_mutations_total{operation=\"gateway_fetch_cache_lookup\",status=\"miss\"}"
        ),
        "missing miss management metric label"
    );
}

#[tokio::test]
async fn redis_timeout_outcomes_are_exported_in_prometheus_metrics() {
    let data: Data<AppState> = build_test_app_state();

    data.metrics_state.record_management_mutation(
        "gateway_fetch_cache_lookup",
        "redis_get_timeout",
        0.01,
    );
    data.metrics_state.record_management_mutation(
        "gateway_fetch_cache_write",
        "redis_set_timeout",
        0.01,
    );

    let app = test::init_service(
        App::new()
            .app_data(data.clone())
            .service(prometheus_metrics),
    )
    .await;
    let metrics_req = TestRequest::get().uri("/metrics").to_request();
    let metrics_response: ServiceResponse = test::call_service(&app, metrics_req).await;
    let metrics_body: Bytes = to_bytes(metrics_response.into_body()).await.unwrap();
    let metrics_text: String = String::from_utf8(metrics_body.to_vec()).unwrap();

    assert!(
        metrics_text.contains(
            "athena_management_mutations_total{operation=\"gateway_fetch_cache_lookup\",status=\"redis_get_timeout\"}"
        ),
        "missing redis_get_timeout management metric label"
    );

    assert!(
        metrics_text.contains(
            "athena_management_mutations_total{operation=\"gateway_fetch_cache_write\",status=\"redis_set_timeout\"}"
        ),
        "missing redis_set_timeout management metric label"
    );
}