use actix_web::body::to_bytes;
use actix_web::dev::ServiceResponse;
use actix_web::http::header::CACHE_CONTROL;
use actix_web::test::TestRequest;
use actix_web::web::Data;
use actix_web::{App, HttpRequest, HttpResponse, test};
use axum::body::Bytes;
use moka::future::Cache;
use reqwest::Client;
use serde_json::{Value, json};
use sqlx::{Pool, Postgres};
use std::sync::Arc;
use std::time::{Duration, Instant};
use athena_rs::AppState;
use athena_rs::api::cache::{
cache_control::is_cache_control_no_cache,
check::{
check_cache_control_and_get_response, check_cache_control_and_get_response_v2,
get_cached_response,
},
hydrate::hydrate_cache_and_return_json,
rehydrate::check_rehydrate,
};
use athena_rs::api::gateway::insert::{InsertWindowCoordinator, InsertWindowSettings};
use athena_rs::api::metrics::{MetricsState, prometheus_metrics};
use athena_rs::drivers::postgresql::deadpool_registry::DeadpoolPostgresRegistry;
use athena_rs::drivers::postgresql::sqlx_driver::PostgresClientRegistry;
fn build_test_app_state() -> Data<AppState> {
let cache: Cache<String, Value> = Cache::builder().build();
let immortal: Cache<String, Value> = Cache::builder().build();
let jdbc_pool_cache: Arc<Cache<String, Pool<Postgres>>> =
Arc::new(Cache::builder().max_capacity(64).build());
let insert_window_coordinator: Arc<InsertWindowCoordinator> =
InsertWindowCoordinator::new(InsertWindowSettings {
max_batch: 100,
max_queued: 10_000,
deny_tables: Default::default(),
});
let data: Data<AppState> = Data::new(AppState {
cache: Arc::new(cache),
immortal_cache: Arc::new(immortal),
client: Client::new(),
process_start_time_seconds: 0 as i64,
process_started_at: Instant::now(),
pg_registry: Arc::new(PostgresClientRegistry::empty()),
jdbc_pool_cache,
gateway_force_camel_case_to_snake_case: false,
gateway_auto_cast_uuid_filter_values_to_text: true,
gateway_allow_schema_names_prefixed_as_table_name: true,
pipeline_registry: None,
logging_client_name: None,
gateway_auth_client_name: None,
gateway_api_key_fail_mode: "fail_closed".to_string(),
gateway_jdbc_allow_private_hosts: true,
gateway_jdbc_allowed_hosts: vec![],
gateway_resilience_timeout_secs: 30,
gateway_resilience_read_max_retries: 1,
gateway_resilience_initial_backoff_ms: 100,
gateway_admission_store_backend: "redis".to_string(),
gateway_admission_store_fail_mode: "fail_closed".to_string(),
prometheus_metrics_enabled: false,
metrics_state: Arc::new(MetricsState::new()),
#[cfg(feature = "deadpool_experimental")]
deadpool_registry: Arc::new(DeadpoolPostgresRegistry::empty()),
#[cfg(feature = "deadpool_experimental")]
jdbc_deadpool_cache: Arc::new(Cache::builder().max_capacity(4).build()),
gateway_insert_execution_window_ms: 0,
gateway_insert_window_max_batch: 100,
gateway_insert_window_max_queued: 10_000,
gateway_insert_merge_deny_tables: Default::default(),
insert_window_coordinator: insert_window_coordinator.clone(),
});
insert_window_coordinator.bind_app_state(data.clone());
data
}
#[actix_web::test]
async fn cache_control_no_cache_true_when_header_present() {
let req: HttpRequest = TestRequest::default()
.insert_header(("Cache-Control", "no-cache"))
.to_http_request();
assert!(is_cache_control_no_cache(&req));
}
#[actix_web::test]
async fn cache_control_no_cache_false_when_header_missing() {
let req: HttpRequest = TestRequest::default().to_http_request();
assert!(!is_cache_control_no_cache(&req));
}
#[actix_web::test]
async fn cache_control_no_cache_false_other_directives() {
let req: HttpRequest = TestRequest::default()
.insert_header(("Cache-Control", "max-age=60"))
.to_http_request();
assert!(!is_cache_control_no_cache(&req));
}
#[actix_web::test]
async fn rehydrate_returns_true_when_present() {
let req: HttpRequest = TestRequest::default()
.uri("/path?rehydrate=true")
.to_http_request();
assert!(check_rehydrate(&req).await);
}
#[actix_web::test]
async fn rehydrate_returns_false_when_missing() {
let req: HttpRequest = TestRequest::default()
.uri("/path?foo=bar")
.to_http_request();
assert!(!check_rehydrate(&req).await);
}
#[tokio::test]
async fn hydrate_inserts_and_returns() {
let data: Data<AppState> = build_test_app_state();
let key: String = "test_key".to_string();
let payload: Vec<Value> = vec![json!({"a": 1}), json!({"b": 2})];
let returned: Vec<Value> =
hydrate_cache_and_return_json(data.clone(), key.clone(), payload.clone()).await;
assert_eq!(returned, payload);
let cached: Option<Value> = data.cache.get(&key).await;
assert!(cached.is_some());
assert_eq!(cached.unwrap(), Value::Array(payload));
}
#[tokio::test]
async fn hydrate_overwrites_existing_key() {
let data: Data<AppState> = build_test_app_state();
let key: String = "key2".to_string();
hydrate_cache_and_return_json(data.clone(), key.clone(), vec![json!({"v": 1})]).await;
hydrate_cache_and_return_json(data.clone(), key.clone(), vec![json!({"v": 2})]).await;
let cached: Value = data.cache.get(&key).await.unwrap();
assert_eq!(cached, Value::Array(vec![json!({"v": 2})]));
}
#[tokio::test]
async fn hydrate_writes_raw_fast_path_payload_for_single_item_arrays() {
let data: Data<AppState> = build_test_app_state();
let key: String = "raw_key".to_string();
let payload: Vec<Value> = vec![json!({"v": 3})];
hydrate_cache_and_return_json(data.clone(), key.clone(), payload).await;
let raw_key: String = format!("{key}:__raw_json");
let raw: Option<Value> = data.cache.get(&raw_key).await;
assert!(raw.is_some());
assert_eq!(raw.unwrap(), Value::String("{\"v\":3}".to_string()));
}
#[tokio::test]
async fn check_get_cached_response_returns_value() {
let data: Data<AppState> = build_test_app_state();
let key: String = "k".to_string();
data.cache.insert(key.clone(), json!({"x": 1})).await;
let maybe: Option<actix_web::HttpResponse> = get_cached_response(data, &key).await;
assert!(maybe.is_some());
}
#[tokio::test]
async fn check_cache_control_no_cache_skips() {
let data: Data<AppState> = build_test_app_state();
let key: String = "k2".to_string();
data.cache.insert(key.clone(), json!({"y": 2})).await;
let req = TestRequest::default()
.insert_header((CACHE_CONTROL, "no-cache"))
.to_http_request();
let got: Option<HttpResponse> = check_cache_control_and_get_response(&req, data, &key).await;
assert!(got.is_none());
}
#[tokio::test]
async fn check_v2_flattens_single_item_array() {
let data: Data<AppState> = build_test_app_state();
let key: String = "k3".to_string();
data.cache.insert(key.clone(), json!([{"z": 3}])).await;
let req: HttpRequest = TestRequest::default().to_http_request();
let resp: Option<HttpResponse> =
check_cache_control_and_get_response_v2(&req, data, &key).await;
assert!(resp.is_some());
}
#[tokio::test]
async fn check_v2_cache_miss_returns_none() {
let data: Data<AppState> = build_test_app_state();
let key: String = "missing".to_string();
let req: HttpRequest = TestRequest::default().to_http_request();
let resp: Option<HttpResponse> =
check_cache_control_and_get_response_v2(&req, data, &key).await;
assert!(resp.is_none());
}
#[tokio::test]
async fn check_v2_prefers_raw_fast_path_payload_when_available() {
let data: Data<AppState> = build_test_app_state();
let key: String = "raw_hit".to_string();
let raw_key: String = format!("{key}:__raw_json");
data.cache
.insert(raw_key, Value::String("{\"fast\":true}".to_string()))
.await;
let req: HttpRequest = TestRequest::default().to_http_request();
let resp: Option<HttpResponse> =
check_cache_control_and_get_response_v2(&req, data, &key).await;
assert!(resp.is_some());
let body: Bytes = to_bytes(resp.unwrap().into_body()).await.unwrap();
let parsed: Value = serde_json::from_slice(&body).unwrap();
assert_eq!(parsed, json!({"fast": true}));
}
#[tokio::test]
#[ignore = "benchmark-style latency test; run with --release on representative hardware"]
async fn cache_hit_p95_for_large_payload_is_under_50ms_in_release_mode() {
let data: Data<AppState> = build_test_app_state();
let key: String = "latency_large_payload".to_string();
let large_payload: Vec<Value> = (0..2000)
.map(|idx| {
json!({
"id": idx,
"name": format!("item-{idx}"),
"active": idx % 2 == 0,
"score": idx * 3,
"meta": { "source": "bench", "group": idx % 7 }
})
})
.collect();
let _ =
hydrate_cache_and_return_json(data.clone(), key.clone(), vec![json!(large_payload)]).await;
let req = TestRequest::default().to_http_request();
let mut samples: Vec<Duration> = Vec::with_capacity(200);
for _ in 0..200 {
let started: Instant = Instant::now();
let resp: Option<HttpResponse> =
check_cache_control_and_get_response_v2(&req, data.clone(), &key).await;
assert!(resp.is_some());
samples.push(started.elapsed());
}
samples.sort_unstable();
let p95_index: usize = ((samples.len() as f64 * 0.95).ceil() as usize)
.saturating_sub(1)
.min(samples.len().saturating_sub(1));
let p95 = samples[p95_index];
assert!(
p95.as_millis() < 50,
"expected p95 cache hit latency < 50ms, got {}ms",
p95.as_millis()
);
}
#[tokio::test]
async fn check_strips_nulls_when_header_set() {
let data: Data<AppState> = build_test_app_state();
let key: String = "strip".to_string();
data.cache
.insert(key.clone(), json!({"data": [{"a": 1, "b": null}]}))
.await;
let req = TestRequest::default()
.insert_header(("X-Strip-Nulls", "true"))
.to_http_request();
let resp = check_cache_control_and_get_response(&req, data, &key).await;
assert!(resp.is_some());
}
#[tokio::test]
async fn cache_lookup_outcomes_are_exported_in_prometheus_metrics() {
let data: Data<AppState> = build_test_app_state();
let req: HttpRequest = TestRequest::default().to_http_request();
let raw_hit_key: String = "metrics_raw_hit".to_string();
data.cache
.insert(
format!("{raw_hit_key}:__raw_json"),
Value::String("{\"from\":\"raw\"}".to_string()),
)
.await;
let local_hit_key: String = "metrics_local_hit".to_string();
data.cache
.insert(local_hit_key.clone(), json!([{"from": "local"}]))
.await;
let miss_key: String = "metrics_miss".to_string();
let _ = check_cache_control_and_get_response_v2(&req, data.clone(), &raw_hit_key).await;
let _ = check_cache_control_and_get_response_v2(&req, data.clone(), &local_hit_key).await;
let _ = check_cache_control_and_get_response_v2(&req, data.clone(), &miss_key).await;
let app = test::init_service(
App::new()
.app_data(data.clone())
.service(prometheus_metrics),
)
.await;
let metrics_req = TestRequest::get().uri("/metrics").to_request();
let metrics_response: actix_web::dev::ServiceResponse =
test::call_service(&app, metrics_req).await;
let metrics_body: Bytes = to_bytes(metrics_response.into_body()).await.unwrap();
let metrics_text: String = String::from_utf8(metrics_body.to_vec()).unwrap();
assert!(
metrics_text.contains(
"athena_management_mutations_total{operation=\"gateway_fetch_cache_lookup\",status=\"hit_local_raw\"}"
),
"missing hit_local_raw management metric label"
);
assert!(
metrics_text.contains(
"athena_management_mutations_total{operation=\"gateway_fetch_cache_lookup\",status=\"hit_local\"}"
),
"missing hit_local management metric label"
);
assert!(
metrics_text.contains(
"athena_management_mutations_total{operation=\"gateway_fetch_cache_lookup\",status=\"miss\"}"
),
"missing miss management metric label"
);
}
#[tokio::test]
async fn redis_timeout_outcomes_are_exported_in_prometheus_metrics() {
let data: Data<AppState> = build_test_app_state();
data.metrics_state.record_management_mutation(
"gateway_fetch_cache_lookup",
"redis_get_timeout",
0.01,
);
data.metrics_state.record_management_mutation(
"gateway_fetch_cache_write",
"redis_set_timeout",
0.01,
);
let app = test::init_service(
App::new()
.app_data(data.clone())
.service(prometheus_metrics),
)
.await;
let metrics_req = TestRequest::get().uri("/metrics").to_request();
let metrics_response: ServiceResponse = test::call_service(&app, metrics_req).await;
let metrics_body: Bytes = to_bytes(metrics_response.into_body()).await.unwrap();
let metrics_text: String = String::from_utf8(metrics_body.to_vec()).unwrap();
assert!(
metrics_text.contains(
"athena_management_mutations_total{operation=\"gateway_fetch_cache_lookup\",status=\"redis_get_timeout\"}"
),
"missing redis_get_timeout management metric label"
);
assert!(
metrics_text.contains(
"athena_management_mutations_total{operation=\"gateway_fetch_cache_write\",status=\"redis_set_timeout\"}"
),
"missing redis_set_timeout management metric label"
);
}