use crate::models::V1UserProfile;
use crate::query::Query;
use crate::state::{AppState, MessageQueue};
use axum::{
extract::{Extension, Path, Query as QueryParam, State},
http::StatusCode,
response::IntoResponse,
Json,
};
use futures::stream::{self, StreamExt};
use redis::AsyncCommands;
use serde::Deserialize;
use serde_json::json;
use std::collections::HashSet;
use tracing::{debug, error, info};
#[derive(Deserialize, Debug)]
pub struct CacheKeyParams {
namespace: Option<String>,
}
pub async fn list_cache_keys(
State(state): State<AppState>,
Extension(user_profile): Extension<V1UserProfile>,
QueryParam(params): QueryParam<CacheKeyParams>,
) -> Result<Json<Vec<String>>, (StatusCode, Json<serde_json::Value>)> {
let db_pool = &state.db_pool;
let requested_namespace = params.namespace;
info!(
"Listing cache keys (namespace: {:?}) requested by user: {}",
requested_namespace, user_profile.email
);
let redis_client = match &state.message_queue {
MessageQueue::Redis { client } => client.clone(),
_ => {
error!("Redis client not available in AppState. Cache operations require Redis.");
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": "Redis client not configured on the server." })),
));
}
};
let mut all_keys = HashSet::new();
let namespaces_to_scan: Vec<String> = if let Some(ns) = requested_namespace {
info!("Scanning specific namespace: {}", ns);
vec![ns]
} else {
info!(
"No specific namespace provided. Fetching all accessible namespaces for user {}",
user_profile.email
);
let mut owner_ids: Vec<String> = user_profile
.organizations
.as_ref()
.map(|orgs| orgs.keys().cloned().collect())
.unwrap_or_default();
owner_ids.push(user_profile.email.clone());
let owner_id_refs: Vec<&str> = owner_ids.iter().map(|s| s.as_str()).collect();
match Query::find_namespaces_by_owners(db_pool, &owner_id_refs).await {
Ok(namespaces) => {
let ns_names: Vec<String> = namespaces.into_iter().map(|n| n.id).collect();
info!(
"Found {} accessible namespaces: {:?}",
ns_names.len(),
ns_names
);
ns_names
}
Err(e) => {
error!(
"Failed to query namespaces for owners {:?}: {}",
owner_id_refs, e
);
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": "Failed to retrieve accessible namespaces." })),
));
}
}
};
let results = stream::iter(namespaces_to_scan)
.map(|ns| {
let client = redis_client.clone();
async move {
let pattern = format!("cache:{}:*", ns);
debug!("Scanning Redis with pattern: {}", pattern);
let mut keys_for_ns = Vec::new();
let mut conn = match client.get_multiplexed_async_connection().await {
Ok(conn) => conn,
Err(e) => {
error!("Failed to get Redis connection for namespace {}: {}", ns, e);
return Err(format!("Redis connection failed for namespace {}", ns));
}
};
let mut iter: redis::AsyncIter<String> = match conn.scan_match(&pattern).await {
Ok(iter) => iter,
Err(e) => {
error!("Redis SCAN failed for pattern {}: {}", pattern, e);
return Err(format!("Redis SCAN failed for namespace {}", ns));
}
};
while let Some(key) = iter.next_item().await {
keys_for_ns.push(key);
}
debug!("Found {} keys for namespace {}", keys_for_ns.len(), ns);
Ok(keys_for_ns)
}
})
.buffer_unordered(10) .collect::<Vec<Result<Vec<String>, String>>>()
.await;
for result in results {
match result {
Ok(keys) => {
for key in keys {
all_keys.insert(key);
}
}
Err(e) => {
error!("Error during Redis scan: {}", e);
}
}
}
let final_keys: Vec<String> = all_keys.into_iter().collect();
info!(
"Found {} total unique cache keys across scanned namespaces",
final_keys.len()
);
Ok(Json(final_keys))
}
pub async fn get_cache_key(
State(state): State<AppState>,
Extension(user_profile): Extension<V1UserProfile>,
Path((namespace, key_suffix)): Path<(String, String)>,
) -> Result<Json<String>, (StatusCode, Json<serde_json::Value>)> {
let db_pool = &state.db_pool;
info!(
"Getting cache key suffix '{}' in namespace '{}' requested by user: {}",
key_suffix, namespace, user_profile.email
);
let mut owner_ids: Vec<String> = user_profile
.organizations
.as_ref()
.map(|orgs| orgs.keys().cloned().collect())
.unwrap_or_default();
owner_ids.push(user_profile.email.clone());
let owner_id_refs: Vec<&str> = owner_ids.iter().map(|s| s.as_str()).collect();
let accessible_namespaces =
match Query::find_namespaces_by_owners(db_pool, &owner_id_refs).await {
Ok(namespaces) => namespaces
.into_iter()
.map(|n| n.id)
.collect::<HashSet<String>>(),
Err(e) => {
error!(
"Failed to query namespaces for owners {:?}: {}",
owner_id_refs, e
);
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": "Failed to retrieve accessible namespaces." })),
));
}
};
if !accessible_namespaces.contains(&namespace) {
error!(
"User {} does not have access to namespace '{}'",
user_profile.email, namespace
);
return Err((
StatusCode::FORBIDDEN,
Json(json!({ "error": "Access denied to the specified namespace." })),
));
}
let redis_client = match &state.message_queue {
MessageQueue::Redis { client } => client.clone(),
_ => {
error!("Redis client not available in AppState.");
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": "Redis client not configured." })),
));
}
};
let mut conn = match redis_client.get_multiplexed_async_connection().await {
Ok(conn) => conn,
Err(e) => {
error!("Failed to get async Redis connection: {}", e);
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": "Failed to connect to Redis." })),
));
}
};
let clean_key_suffix = key_suffix.strip_prefix('/').unwrap_or(&key_suffix);
let full_key = format!("cache:{}:{}", namespace, clean_key_suffix);
info!("Attempting to GET key: {}", full_key);
match conn.get::<_, Option<String>>(&full_key).await {
Ok(Some(value)) => {
debug!("Found value for key {}: (value hidden)", full_key);
Ok(Json(value))
}
Ok(None) => {
info!("Key not found: {}", full_key);
Err((
StatusCode::NOT_FOUND,
Json(json!({ "error": "Cache key not found." })),
))
}
Err(e) => {
error!("Redis GET command failed for key {}: {}", full_key, e);
Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": "Failed to execute Redis GET command." })),
))
}
}
}
pub async fn delete_cache_key(
State(state): State<AppState>,
Extension(user_profile): Extension<V1UserProfile>,
Path((namespace, key_suffix)): Path<(String, String)>,
) -> Result<StatusCode, (StatusCode, Json<serde_json::Value>)> {
let db_pool = &state.db_pool;
info!(
"Deleting cache key suffix '{}' in namespace '{}' requested by user: {}",
key_suffix, namespace, user_profile.email
);
let mut owner_ids: Vec<String> = user_profile
.organizations
.as_ref()
.map(|orgs| orgs.keys().cloned().collect())
.unwrap_or_default();
owner_ids.push(user_profile.email.clone());
let owner_id_refs: Vec<&str> = owner_ids.iter().map(|s| s.as_str()).collect();
let accessible_namespaces =
match Query::find_namespaces_by_owners(db_pool, &owner_id_refs).await {
Ok(namespaces) => namespaces
.into_iter()
.map(|n| n.id)
.collect::<HashSet<String>>(),
Err(e) => {
error!(
"Failed to query namespaces for owners {:?}: {}",
owner_id_refs, e
);
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": "Failed to retrieve accessible namespaces." })),
));
}
};
if !accessible_namespaces.contains(&namespace) {
error!(
"User {} does not have access to namespace '{}'",
user_profile.email, namespace
);
return Err((
StatusCode::FORBIDDEN,
Json(json!({ "error": "Access denied to the specified namespace." })),
));
}
let redis_client = match &state.message_queue {
MessageQueue::Redis { client } => client.clone(),
_ => {
error!("Redis client not available in AppState.");
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": "Redis client not configured." })),
));
}
};
let mut conn = match redis_client.get_multiplexed_async_connection().await {
Ok(conn) => conn,
Err(e) => {
error!("Failed to get async Redis connection: {}", e);
return Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": "Failed to connect to Redis." })),
));
}
};
let clean_key_suffix = key_suffix.strip_prefix('/').unwrap_or(&key_suffix);
let full_key = format!("cache:{}:{}", namespace, clean_key_suffix);
info!("Attempting to DEL key: {}", full_key);
match conn.del::<_, i32>(&full_key).await {
Ok(num_deleted) => {
info!(
"DEL command for key '{}' deleted {} keys.",
full_key, num_deleted
);
Ok(StatusCode::OK) }
Err(e) => {
error!("Redis DEL command failed for key {}: {}", full_key, e);
Err((
StatusCode::INTERNAL_SERVER_ERROR,
Json(json!({ "error": "Failed to execute Redis DEL command." })),
))
}
}
}