use crate::AppState;
use crate::utils::redis_client::{
GLOBAL_REDIS, note_redis_failure_and_start_cooldown, note_redis_success,
redis_operation_timeout, should_bypass_redis_temporarily,
};
use actix_web::{HttpRequest, HttpResponse, web::Data};
use serde_json::{Value, json};
use std::time::Instant;
const RAW_CACHE_KEY_SUFFIX: &str = "__raw_json";
fn raw_cache_key(cache_key: &str) -> String {
format!("{cache_key}:{RAW_CACHE_KEY_SUFFIX}")
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum CacheLookupOutcome {
BypassNoCacheHeader,
HitLocalRaw,
HitLocal,
HitRedis,
MissAllTiers,
MissAfterRedisGetError,
MissAfterRedisGetTimeout,
}
impl CacheLookupOutcome {
pub fn as_str(self) -> &'static str {
match self {
CacheLookupOutcome::BypassNoCacheHeader => "bypass_no_cache_header",
CacheLookupOutcome::HitLocalRaw => "hit_local_raw",
CacheLookupOutcome::HitLocal => "hit_local",
CacheLookupOutcome::HitRedis => "hit_redis",
CacheLookupOutcome::MissAllTiers => "miss_all_tiers",
CacheLookupOutcome::MissAfterRedisGetError => "miss_after_redis_get_error",
CacheLookupOutcome::MissAfterRedisGetTimeout => "miss_after_redis_get_timeout",
}
}
}
pub async fn get_cached_response(
app_state: Data<AppState>,
cache_key: &str,
) -> Option<HttpResponse> {
let started_at: Instant = Instant::now();
if let Some(cached_response) = app_state.cache.get(cache_key).await {
return Some(HttpResponse::Ok().json(cached_response.clone()));
}
if let Some(redis) = GLOBAL_REDIS.get() {
if should_bypass_redis_temporarily() {
return None;
}
let redis_lookup_result =
tokio::time::timeout(redis_operation_timeout(), redis.get(cache_key)).await;
match redis_lookup_result {
Ok(Ok(value)) if !value.is_null() => {
note_redis_success();
app_state
.cache
.insert(cache_key.to_string(), value.clone())
.await;
return Some(HttpResponse::Ok().json(value));
}
Ok(Ok(_)) => note_redis_success(),
Ok(Err(err)) => {
note_redis_failure_and_start_cooldown();
app_state.metrics_state.record_management_mutation(
"gateway_fetch_cache_lookup",
"redis_get_error",
started_at.elapsed().as_secs_f64(),
);
tracing::warn!(error = %err, cache_key = %cache_key, "Redis get failed; continuing with local cache only");
}
Err(_) => {
note_redis_failure_and_start_cooldown();
app_state.metrics_state.record_management_mutation(
"gateway_fetch_cache_lookup",
"redis_get_timeout",
started_at.elapsed().as_secs_f64(),
);
tracing::warn!(cache_key = %cache_key, "Redis get timed out; continuing with local cache only");
}
}
}
None
}
pub async fn check_cache_control_and_get_response(
req: &HttpRequest,
app_state: Data<AppState>,
cache_key: &str,
) -> Option<HttpResponse> {
if let Some(cache_control_header) = req.headers().get("Cache-Control")
&& let Ok(cache_control_value) = cache_control_header.to_str()
&& cache_control_value.contains("no-cache")
{
return None;
}
if let Some(cached_response) = app_state.cache.get(cache_key).await {
return Some(HttpResponse::Ok().json(json!({
"success": true,
"data": cached_response.clone()
})));
}
None
}
pub async fn check_cache_control_and_get_response_v2(
req: &HttpRequest,
app_state: Data<AppState>,
cache_key: &str,
) -> Option<HttpResponse> {
let (response, _outcome) = check_cache_control_and_get_response_v2_with_outcome(
req,
app_state,
cache_key,
"gateway_fetch_cache_lookup",
)
.await;
response
}
pub async fn check_cache_control_and_get_response_v2_with_outcome(
req: &HttpRequest,
app_state: Data<AppState>,
cache_key: &str,
lookup_metric: &str,
) -> (Option<HttpResponse>, CacheLookupOutcome) {
let started_at: Instant = Instant::now();
if let Some(cache_control_header) = req.headers().get("Cache-Control")
&& let Ok(cache_control_value) = cache_control_header.to_str()
&& cache_control_value.contains("no-cache")
{
app_state.metrics_state.record_management_mutation(
lookup_metric,
CacheLookupOutcome::BypassNoCacheHeader.as_str(),
started_at.elapsed().as_secs_f64(),
);
return (None, CacheLookupOutcome::BypassNoCacheHeader);
}
let raw_key: String = raw_cache_key(cache_key);
if let Some(Value::String(raw_body)) = app_state.cache.get(&raw_key).await {
let elapsed_secs = started_at.elapsed().as_secs_f64();
app_state.metrics_state.record_management_mutation(
lookup_metric,
CacheLookupOutcome::HitLocalRaw.as_str(),
elapsed_secs,
);
tracing::debug!(
cache_key = %cache_key,
source = "local_raw",
duration_ms = started_at.elapsed().as_millis(),
"gateway fetch cache hit"
);
return (
Some(
HttpResponse::Ok()
.content_type("application/json")
.body(raw_body),
),
CacheLookupOutcome::HitLocalRaw,
);
}
if let Some(cached_response) = app_state.cache.get(cache_key).await {
if let Value::Array(arr) = &cached_response
&& arr.len() == 1
{
let elapsed_secs = started_at.elapsed().as_secs_f64();
app_state.metrics_state.record_management_mutation(
lookup_metric,
CacheLookupOutcome::HitLocal.as_str(),
elapsed_secs,
);
tracing::debug!(
cache_key = %cache_key,
source = "local",
duration_ms = started_at.elapsed().as_millis(),
"gateway fetch cache hit"
);
return (
Some(HttpResponse::Ok().json(&arr[0])),
CacheLookupOutcome::HitLocal,
);
}
let elapsed_secs = started_at.elapsed().as_secs_f64();
app_state.metrics_state.record_management_mutation(
lookup_metric,
CacheLookupOutcome::HitLocal.as_str(),
elapsed_secs,
);
tracing::debug!(
cache_key = %cache_key,
source = "local",
duration_ms = started_at.elapsed().as_millis(),
"gateway fetch cache hit"
);
return (
Some(HttpResponse::Ok().json(cached_response)),
CacheLookupOutcome::HitLocal,
);
}
let mut miss_outcome: CacheLookupOutcome = CacheLookupOutcome::MissAllTiers;
if let Some(redis) = GLOBAL_REDIS.get() {
if should_bypass_redis_temporarily() {
miss_outcome = CacheLookupOutcome::MissAfterRedisGetTimeout;
} else {
let redis_lookup_result =
tokio::time::timeout(redis_operation_timeout(), redis.get(cache_key)).await;
match redis_lookup_result {
Ok(Ok(value)) if !value.is_null() => {
note_redis_success();
app_state
.cache
.insert(cache_key.to_string(), value.clone())
.await;
let serialized_for_fast_path = if let Value::Array(arr) = &value {
if arr.len() == 1 {
serde_json::to_string(&arr[0]).ok()
} else {
serde_json::to_string(&value).ok()
}
} else {
serde_json::to_string(&value).ok()
};
if let Some(raw_body) = serialized_for_fast_path {
app_state
.cache
.insert(raw_key, Value::String(raw_body))
.await;
}
if let Value::Array(arr) = &value
&& arr.len() == 1
{
let elapsed_secs = started_at.elapsed().as_secs_f64();
app_state.metrics_state.record_management_mutation(
lookup_metric,
CacheLookupOutcome::HitRedis.as_str(),
elapsed_secs,
);
tracing::debug!(
cache_key = %cache_key,
source = "redis",
duration_ms = started_at.elapsed().as_millis(),
"gateway fetch cache hit"
);
return (
Some(HttpResponse::Ok().json(&arr[0])),
CacheLookupOutcome::HitRedis,
);
}
let elapsed_secs = started_at.elapsed().as_secs_f64();
app_state.metrics_state.record_management_mutation(
lookup_metric,
CacheLookupOutcome::HitRedis.as_str(),
elapsed_secs,
);
tracing::debug!(
cache_key = %cache_key,
source = "redis",
duration_ms = started_at.elapsed().as_millis(),
"gateway fetch cache hit"
);
return (
Some(HttpResponse::Ok().json(value)),
CacheLookupOutcome::HitRedis,
);
}
Ok(Ok(_)) => note_redis_success(),
Ok(Err(err)) => {
note_redis_failure_and_start_cooldown();
miss_outcome = CacheLookupOutcome::MissAfterRedisGetError;
app_state.metrics_state.record_management_mutation(
lookup_metric,
"redis_get_error",
started_at.elapsed().as_secs_f64(),
);
tracing::warn!(error = %err, cache_key = %cache_key, "Redis get failed; continuing with local cache only");
}
Err(_) => {
note_redis_failure_and_start_cooldown();
miss_outcome = CacheLookupOutcome::MissAfterRedisGetTimeout;
app_state.metrics_state.record_management_mutation(
lookup_metric,
"redis_get_timeout",
started_at.elapsed().as_secs_f64(),
);
tracing::warn!(cache_key = %cache_key, "Redis get timed out; continuing with local cache only");
}
}
}
}
app_state.metrics_state.record_management_mutation(
lookup_metric,
CacheLookupOutcome::MissAllTiers.as_str(),
started_at.elapsed().as_secs_f64(),
);
app_state.metrics_state.record_management_mutation(
lookup_metric,
"miss",
started_at.elapsed().as_secs_f64(),
);
(None, miss_outcome)
}
#[cfg(test)]
mod tests {
use super::{
CacheLookupOutcome, check_cache_control_and_get_response_v2_with_outcome, raw_cache_key,
};
use crate::AppState;
use crate::api::gateway::insert::{InsertWindowCoordinator, InsertWindowSettings};
use crate::api::metrics::MetricsState;
use crate::drivers::postgresql::sqlx_driver::PostgresClientRegistry;
use actix_web::test::TestRequest;
use actix_web::web::Data;
use moka::future::Cache;
use reqwest::Client;
use serde_json::{Value, json};
use std::sync::Arc;
use std::time::Instant;
fn test_app_state() -> Data<AppState> {
let cache: Arc<Cache<String, Value>> = Arc::new(Cache::builder().max_capacity(100).build());
let immortal_cache: Arc<Cache<String, Value>> =
Arc::new(Cache::builder().max_capacity(100).build());
let jdbc_pool_cache = Arc::new(Cache::builder().max_capacity(4).build());
let insert_window_coordinator: Arc<InsertWindowCoordinator> =
InsertWindowCoordinator::new(InsertWindowSettings {
max_batch: 100,
max_queued: 10_000,
deny_tables: Default::default(),
});
let data: Data<AppState> = Data::new(AppState {
cache,
immortal_cache,
client: Client::new(),
process_start_time_seconds: 0,
process_started_at: Instant::now(),
pg_registry: Arc::new(PostgresClientRegistry::empty()),
jdbc_pool_cache,
#[cfg(feature = "deadpool_experimental")]
deadpool_registry: Arc::new(
crate::drivers::postgresql::deadpool_registry::DeadpoolPostgresRegistry::empty(),
),
#[cfg(feature = "deadpool_experimental")]
jdbc_deadpool_cache: Arc::new(Cache::builder().max_capacity(4).build()),
gateway_force_camel_case_to_snake_case: false,
gateway_auto_cast_uuid_filter_values_to_text: true,
gateway_allow_schema_names_prefixed_as_table_name: true,
pipeline_registry: None,
logging_client_name: None,
gateway_auth_client_name: None,
gateway_api_key_fail_mode: "fail_closed".to_string(),
gateway_jdbc_allow_private_hosts: false,
gateway_jdbc_allowed_hosts: Vec::new(),
gateway_resilience_timeout_secs: 30,
gateway_resilience_read_max_retries: 1,
gateway_resilience_initial_backoff_ms: 100,
gateway_admission_store_backend: "redis".to_string(),
gateway_admission_store_fail_mode: "fail_closed".to_string(),
prometheus_metrics_enabled: false,
metrics_state: Arc::new(MetricsState::new()),
gateway_insert_execution_window_ms: 0,
gateway_insert_window_max_batch: 100,
gateway_insert_window_max_queued: 10_000,
gateway_insert_merge_deny_tables: Default::default(),
insert_window_coordinator: insert_window_coordinator.clone(),
});
insert_window_coordinator.bind_app_state(data.clone());
data
}
#[tokio::test]
async fn v2_outcome_bypass_no_cache_header() {
let state = test_app_state();
let req = TestRequest::default()
.insert_header(("Cache-Control", "no-cache"))
.to_http_request();
let (response, outcome) = check_cache_control_and_get_response_v2_with_outcome(
&req,
state,
"cache-key",
"gateway_fetch_cache_lookup",
)
.await;
assert!(response.is_none());
assert_eq!(outcome, CacheLookupOutcome::BypassNoCacheHeader);
}
#[tokio::test]
async fn v2_outcome_hit_local_raw() {
let state = test_app_state();
state
.cache
.insert(
raw_cache_key("cache-key"),
Value::String("{\"ok\":true}".to_string()),
)
.await;
let req = TestRequest::default().to_http_request();
let (response, outcome) = check_cache_control_and_get_response_v2_with_outcome(
&req,
state,
"cache-key",
"gateway_fetch_cache_lookup",
)
.await;
assert!(response.is_some());
assert_eq!(outcome, CacheLookupOutcome::HitLocalRaw);
}
#[tokio::test]
async fn v2_outcome_hit_local() {
let state = test_app_state();
state
.cache
.insert(
"cache-key".to_string(),
Value::Array(vec![json!({"data": 1})]),
)
.await;
let req = TestRequest::default().to_http_request();
let (response, outcome) = check_cache_control_and_get_response_v2_with_outcome(
&req,
state,
"cache-key",
"gateway_fetch_cache_lookup",
)
.await;
assert!(response.is_some());
assert_eq!(outcome, CacheLookupOutcome::HitLocal);
}
}