use crate::AppState;
use actix_web::web::Data;
use once_cell::sync::Lazy;
use std::collections::HashMap;
use std::sync::Mutex;
use std::time::{Duration, Instant};
#[derive(Debug, Clone, Copy, Default)]
pub struct CacheInvalidationSummary {
pub did_run: bool,
pub invalidated_entries: usize,
pub remaining_entries: usize,
pub debounce_delay_ms: Option<u64>,
}
#[derive(Debug, Clone, Copy)]
struct DebounceState {
last_run: Instant,
scheduled: bool,
}
static CACHE_INVALIDATION_DEBOUNCE: Lazy<Mutex<HashMap<String, DebounceState>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
fn invalidation_window_ms() -> u64 {
std::env::var("ATHENA_CACHE_INVALIDATION_WINDOW_MS")
.ok()
.and_then(|raw| raw.trim().parse::<u64>().ok())
.unwrap_or(25)
}
fn debounce_key(client_name: &str, table_name: &str) -> String {
format!(
"{}|{}",
client_name.trim().to_ascii_lowercase(),
table_name.trim().to_ascii_lowercase()
)
}
async fn run_invalidation_now(
app_state: Data<AppState>,
client_name: &str,
table_name: &str,
) -> CacheInvalidationSummary {
let table_prefix = format!("{}:", table_name.trim());
let normalized_client = client_name.trim().to_ascii_lowercase();
let before = app_state.cache.entry_count() as usize;
let _ = app_state.cache.invalidate_entries_if({
move |key, _value| {
let key = key.as_str();
key.starts_with(&table_prefix)
|| key.starts_with("query_count:")
|| key.ends_with(":__raw_json")
|| key.to_ascii_lowercase().contains(&normalized_client)
}
});
app_state.cache.run_pending_tasks().await;
let after = app_state.cache.entry_count() as usize;
CacheInvalidationSummary {
did_run: true,
invalidated_entries: before.saturating_sub(after),
remaining_entries: after,
debounce_delay_ms: None,
}
}
pub async fn invalidate_scoped_gateway_cache(
app_state: Data<AppState>,
client_name: &str,
table_name: &str,
) -> CacheInvalidationSummary {
let window_ms = invalidation_window_ms();
if window_ms == 0 {
return run_invalidation_now(app_state, client_name, table_name).await;
}
let key = debounce_key(client_name, table_name);
let now = Instant::now();
let window = Duration::from_millis(window_ms);
let mut schedule_trailing = false;
let mut trailing_delay_ms = 0_u64;
let should_run_now = if let Ok(mut guard) = CACHE_INVALIDATION_DEBOUNCE.lock() {
let state = guard.entry(key.clone()).or_insert(DebounceState {
last_run: now.checked_sub(window).unwrap_or(now),
scheduled: false,
});
let elapsed = now.saturating_duration_since(state.last_run);
if elapsed < window {
trailing_delay_ms = (window - elapsed).as_millis() as u64;
if !state.scheduled {
state.scheduled = true;
schedule_trailing = true;
}
false
} else {
state.last_run = now;
true
}
} else {
true
};
if should_run_now {
return run_invalidation_now(app_state, client_name, table_name).await;
}
if schedule_trailing {
let app_state_for_task = app_state.clone();
let key_for_task = key.clone();
let client_for_task = client_name.to_string();
let table_for_task = table_name.to_string();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_millis(trailing_delay_ms)).await;
let _ =
run_invalidation_now(app_state_for_task, &client_for_task, &table_for_task).await;
if let Ok(mut guard) = CACHE_INVALIDATION_DEBOUNCE.lock() {
guard.insert(
key_for_task,
DebounceState {
last_run: Instant::now(),
scheduled: false,
},
);
}
});
}
CacheInvalidationSummary {
did_run: false,
invalidated_entries: 0,
remaining_entries: app_state.cache.entry_count() as usize,
debounce_delay_ms: Some(trailing_delay_ms),
}
}