athena_rs 3.3.0

Database gateway API
Documentation
use crate::AppState;
use actix_web::web::Data;
use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::{Duration, Instant};

#[derive(Debug, Clone, Copy, Default)]
pub struct CacheInvalidationSummary {
    pub did_run: bool,
    pub invalidated_entries: usize,
    pub remaining_entries: usize,
    pub debounce_delay_ms: Option<u64>,
}

#[derive(Debug, Clone, Copy)]
struct DebounceState {
    last_run: Instant,
    scheduled: bool,
}

static CACHE_INVALIDATION_DEBOUNCE: Lazy<Mutex<HashMap<String, DebounceState>>> =
    Lazy::new(|| Mutex::new(HashMap::new()));

fn invalidation_window_ms() -> u64 {
    std::env::var("ATHENA_CACHE_INVALIDATION_WINDOW_MS")
        .ok()
        .and_then(|raw| raw.trim().parse::<u64>().ok())
        .unwrap_or(25)
}

fn debounce_key(client_name: &str, table_name: &str) -> String {
    format!(
        "{}|{}",
        client_name.trim().to_ascii_lowercase(),
        table_name.trim().to_ascii_lowercase()
    )
}

async fn run_invalidation_now(
    app_state: Data<AppState>,
    client_name: &str,
    table_name: &str,
) -> CacheInvalidationSummary {
    let table_prefix = format!("{}:", table_name.trim());
    let normalized_client = client_name.trim().to_ascii_lowercase();
    let before = app_state.cache.entry_count() as usize;

    let _ = app_state.cache.invalidate_entries_if({
        move |key, _value| {
            let key = key.as_str();
            key.starts_with(&table_prefix)
                || key.starts_with("query_count:")
                || key.ends_with(":__raw_json")
                || key.to_ascii_lowercase().contains(&normalized_client)
        }
    });

    app_state.cache.run_pending_tasks().await;
    let after = app_state.cache.entry_count() as usize;

    CacheInvalidationSummary {
        did_run: true,
        invalidated_entries: before.saturating_sub(after),
        remaining_entries: after,
        debounce_delay_ms: None,
    }
}

/// Invalidates only cache keys related to a specific client/table/query family.
///
/// Current strategy:
/// - table-scoped gateway fetch keys start with `<table_name>:`
/// - query-count family keys use `query_count:`
/// - raw fast-path variants use `:__raw_json`
pub async fn invalidate_scoped_gateway_cache(
    app_state: Data<AppState>,
    client_name: &str,
    table_name: &str,
) -> CacheInvalidationSummary {
    let window_ms = invalidation_window_ms();
    if window_ms == 0 {
        return run_invalidation_now(app_state, client_name, table_name).await;
    }

    let key = debounce_key(client_name, table_name);
    let now = Instant::now();
    let window = Duration::from_millis(window_ms);

    let mut schedule_trailing = false;
    let mut trailing_delay_ms = 0_u64;
    let should_run_now = if let Ok(mut guard) = CACHE_INVALIDATION_DEBOUNCE.lock() {
        let state = guard.entry(key.clone()).or_insert(DebounceState {
            last_run: now.checked_sub(window).unwrap_or(now),
            scheduled: false,
        });
        let elapsed = now.saturating_duration_since(state.last_run);
        if elapsed < window {
            trailing_delay_ms = (window - elapsed).as_millis() as u64;
            if !state.scheduled {
                state.scheduled = true;
                schedule_trailing = true;
            }
            false
        } else {
            state.last_run = now;
            true
        }
    } else {
        true
    };

    if should_run_now {
        return run_invalidation_now(app_state, client_name, table_name).await;
    }

    if schedule_trailing {
        let app_state_for_task = app_state.clone();
        let key_for_task = key.clone();
        let client_for_task = client_name.to_string();
        let table_for_task = table_name.to_string();
        tokio::spawn(async move {
            tokio::time::sleep(Duration::from_millis(trailing_delay_ms)).await;
            let _ =
                run_invalidation_now(app_state_for_task, &client_for_task, &table_for_task).await;
            if let Ok(mut guard) = CACHE_INVALIDATION_DEBOUNCE.lock() {
                guard.insert(
                    key_for_task,
                    DebounceState {
                        last_run: Instant::now(),
                        scheduled: false,
                    },
                );
            }
        });
    }

    CacheInvalidationSummary {
        did_run: false,
        invalidated_entries: 0,
        remaining_entries: app_state.cache.entry_count() as usize,
        debounce_delay_ms: Some(trailing_delay_ms),
    }
}