athena_rs 3.3.0

Database gateway API
Documentation
//! ## Cache hydration
//!
//! This module provides functionality to hydrate the application's shared cache with JSON data.
//! It includes a function to insert JSON data into the cache and return the same data.

use crate::AppState;
use actix_web::web::Data;
use serde_json::Value;
use tokio::time::error::Elapsed;
// no-op: keep imports minimal here
use crate::utils::redis_client::{
    GLOBAL_REDIS, note_redis_failure_and_start_cooldown, note_redis_success,
    redis_operation_timeout, should_bypass_redis_temporarily,
};
use std::time::Instant;

const RAW_CACHE_KEY_SUFFIX: &str = "__raw_json";

fn raw_cache_key(cache_key: &str) -> String {
    format!("{cache_key}:{RAW_CACHE_KEY_SUFFIX}")
}

/// Hydrates the cache with the provided JSON body and returns the JSON body.
///
/// # Arguments
///
/// * `app_state` - A `Data<AppState>` representing the shared application state with caches.
/// * `cache_key` - A `String` that serves as the key for storing the JSON body in the cache.
/// * `json_body` - A `Vec<Value>` containing the JSON data to be cached.
///
/// # Returns
///
/// * `Vec<Value>` - The same JSON body that was provided as input.
///
/// # Example
///
/// ```rust,no_run
/// # use actix_web::web::Data;
/// # use athena_rs::api::cache::hydrate::hydrate_cache_and_return_json;
/// # use athena_rs::AppState;
/// # use athena_rs::drivers::postgresql::sqlx_driver::PostgresClientRegistry;
/// # use moka::future::Cache;
/// # use reqwest::Client;
/// # use serde_json::json;
/// # use std::sync::Arc;
/// # use std::time::Instant;
/// # async fn doc_example() {
/// #     let cache: Cache<String, serde_json::Value> = Cache::builder().build();
/// #     let immortal: Cache<String, serde_json::Value> = Cache::builder().build();
/// #     let jdbc_pool_cache = Arc::new(Cache::builder().max_capacity(64).build());
/// #     let app_state = AppState {
/// #         cache: Arc::new(cache),
/// #         immortal_cache: Arc::new(immortal),
/// #         client: Client::new(),
/// #         process_start_time_seconds: 0,
/// #         process_started_at: Instant::now(),
/// #         pg_registry: Arc::new(PostgresClientRegistry::empty()),
/// #         jdbc_pool_cache,
/// #         #[cfg(feature = "deadpool_experimental")]
/// #         deadpool_registry: Arc::new(
/// #             athena_rs::drivers::postgresql::deadpool_registry::DeadpoolPostgresRegistry::empty(),
/// #         ),
/// #         #[cfg(feature = "deadpool_experimental")]
/// #         jdbc_deadpool_cache: Arc::new(Cache::builder().max_capacity(4).build()),
/// #         gateway_force_camel_case_to_snake_case: false,
/// #         gateway_auto_cast_uuid_filter_values_to_text: true,
/// #         gateway_allow_schema_names_prefixed_as_table_name: true,
/// #         pipeline_registry: None,
/// #         logging_client_name: None,
/// #         gateway_auth_client_name: None,
/// #         gateway_api_key_fail_mode: "fail_closed".to_string(),
/// #         gateway_jdbc_allow_private_hosts: false,
/// #         gateway_jdbc_allowed_hosts: Vec::new(),
/// #         gateway_resilience_timeout_secs: 30,
/// #         gateway_resilience_read_max_retries: 1,
/// #         gateway_resilience_initial_backoff_ms: 100,
/// #         gateway_admission_store_backend: "redis".to_string(),
/// #         gateway_admission_store_fail_mode: "fail_closed".to_string(),
/// #         prometheus_metrics_enabled: false,
/// #         metrics_state: Arc::new(athena_rs::api::metrics::MetricsState::new()),
/// #     };
/// #     let app_state = Data::new(app_state);
/// let cache_key = String::from("my_cache_key");
/// let json_body = vec![json!(1), json!(2), json!(3)];
/// let result = hydrate_cache_and_return_json(app_state, cache_key, json_body).await;
/// assert_eq!(result, vec![json!(1), json!(2), json!(3)]);
/// # }
/// ```
pub async fn hydrate_cache_and_return_json(
    app_state: Data<AppState>,
    cache_key: String,
    json_body: Vec<Value>,
) -> Vec<Value> {
    hydrate_cache_and_return_json_with_write_metric(
        app_state,
        cache_key,
        json_body,
        "gateway_fetch_cache_write",
    )
    .await
}

/// Same as [`hydrate_cache_and_return_json`] but uses `write_metric` for Redis write telemetry
/// (e.g. `query_count_cache_write`).
pub async fn hydrate_cache_and_return_json_with_write_metric(
    app_state: Data<AppState>,
    cache_key: String,
    json_body: Vec<Value>,
    write_metric: &str,
) -> Vec<Value> {
    // Store just the array payload in the cache to match consumers' expectations
    let value_to_cache: Value = Value::Array(json_body.clone());
    let raw_body: Option<String> = match &value_to_cache {
        Value::Array(arr) if arr.len() == 1 => serde_json::to_string(&arr[0]).ok(),
        _ => serde_json::to_string(&value_to_cache).ok(),
    };

    app_state
        .cache
        .insert(cache_key.clone(), value_to_cache)
        .await;

    if let Some(raw_body) = raw_body {
        app_state
            .cache
            .insert(raw_cache_key(&cache_key), Value::String(raw_body))
            .await;
    }

    // Also write-through to Redis with a TTL that mirrors the in-memory cache TTL if desired
    if let Some(redis) = GLOBAL_REDIS.get() {
        if should_bypass_redis_temporarily() {
            return json_body;
        }
        let redis_ttl_secs: u64 = app_state
            .cache
            .policy()
            .time_to_live()
            .map(|ttl| ttl.as_secs())
            .filter(|ttl| *ttl > 0)
            .unwrap_or(900u64);
        let redis_write_started_at: Instant = Instant::now();
        let redis_write_result: Result<Result<(), String>, Elapsed> = tokio::time::timeout(
            redis_operation_timeout(),
            redis.set_with_ttl(&cache_key, &Value::Array(json_body.clone()), redis_ttl_secs),
        )
        .await;

        match redis_write_result {
            Ok(Ok(())) => {
                note_redis_success();
                app_state.metrics_state.record_management_mutation(
                    write_metric,
                    "redis_set_ok",
                    redis_write_started_at.elapsed().as_secs_f64(),
                );
                tracing::debug!(cache_key = %cache_key, ttl_secs = redis_ttl_secs, "Redis set succeeded during cache hydration");
            }
            Ok(Err(err)) => {
                note_redis_failure_and_start_cooldown();
                app_state.metrics_state.record_management_mutation(
                    write_metric,
                    "redis_set_error",
                    redis_write_started_at.elapsed().as_secs_f64(),
                );
                tracing::warn!(error = %err, cache_key = %cache_key, ttl_secs = redis_ttl_secs, "Redis set failed during cache hydration");
            }
            Err(_) => {
                note_redis_failure_and_start_cooldown();
                app_state.metrics_state.record_management_mutation(
                    write_metric,
                    "redis_set_timeout",
                    redis_write_started_at.elapsed().as_secs_f64(),
                );
                tracing::warn!(cache_key = %cache_key, ttl_secs = redis_ttl_secs, "Redis set timed out during cache hydration");
            }
        }
    }

    json_body
}