athena_rs 0.83.0

Database gateway API
Documentation
//! Dynamic column name resolution for PostgreSQL tables.
//!
//! Resolves requested column names to actual database column names by querying
//! the information_schema. This handles cases where API requests use camelCase
//! field names that don't match snake_case database columns.

use anyhow::{Result, anyhow};
use once_cell::sync::Lazy;
use sqlx::PgPool;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, warn};

type TableColumnMap = HashMap<String, HashMap<String, String>>;

/// Global cache for table column mappings
/// Structure: table_name -> (requested_column -> actual_column)
static COLUMN_CACHE: Lazy<Arc<RwLock<TableColumnMap>>> =
    Lazy::new(|| Arc::new(RwLock::new(HashMap::new())));

/// Queries the database to get all column names for a table
async fn fetch_table_columns(pool: &PgPool, table_name: &str) -> Result<Vec<String>> {
    let query = r#"
        SELECT column_name
        FROM information_schema.columns
        WHERE table_name = $1
        AND table_schema = 'public'
        ORDER BY ordinal_position
    "#;

    let rows = sqlx::query_scalar::<_, String>(query)
        .bind(table_name)
        .fetch_all(pool)
        .await
        .map_err(|e| anyhow!("Failed to query table columns: {}", e))?;

    Ok(rows)
}

/// Finds the best matching column name from available columns
///
/// Tries multiple strategies:
/// 1. Exact match (case-insensitive)
/// 2. Snake_case conversion match
/// 3. Fuzzy matching based on similarity
#[doc(hidden)]
pub fn find_matching_column(requested: &str, available_columns: &[String]) -> Option<String> {
    let requested_lower = requested.to_lowercase();

    // Strategy 1: Exact match (case-insensitive)
    for col in available_columns {
        if col.to_lowercase() == requested_lower {
            return Some(col.clone());
        }
    }

    // Strategy 2: Check if it's already in snake_case and matches a column
    let snake_case_version = crate::utils::format::camel_to_snake_case(requested);
    for col in available_columns {
        if col.to_lowercase() == snake_case_version.to_lowercase() {
            return Some(col.clone());
        }
    }

    // Strategy 3: Try to find similar column names (e.g., display_username -> display_name)
    // Check if requested column shares a prefix with any actual column
    let requested_parts: Vec<&str> = requested_lower.split('_').collect();
    if requested_parts.len() >= 2 {
        let prefix = requested_parts[0];

        // Find columns that start with the same prefix
        let mut candidates: Vec<&String> = available_columns
            .iter()
            .filter(|col| col.to_lowercase().starts_with(prefix))
            .collect();

        // If we found exactly one candidate, use it
        if candidates.len() == 1 {
            debug!(
                "Fuzzy matched '{}' to '{}' based on prefix '{}' for columns {:?}",
                requested, candidates[0], prefix, available_columns
            );
            return Some(candidates[0].clone());
        }

        // If multiple candidates, try to find the best match
        // For display_username, we prefer display_name over display_* variants
        candidates.sort_by_key(|col| col.len());
        if let Some(best) = candidates.first() {
            debug!(
                "Multiple matches for '{}', choosing shortest: '{}'",
                requested, best
            );
            return Some((*best).clone());
        }
    }

    None
}

/// Resolves column names for a table, using cache when available
///
/// # Arguments
/// * `pool` - Database connection pool
/// * `table_name` - Name of the table
/// * `requested_columns` - Column names as requested by the API
///
/// # Returns
/// Vector of actual column names that exist in the database
pub async fn resolve_columns(
    pool: &PgPool,
    table_name: &str,
    requested_columns: &[&str],
) -> Result<Vec<String>> {
    // Check if we have this table cached
    {
        let cache = COLUMN_CACHE.read().await;
        if let Some(table_map) = cache.get(table_name) {
            let mut resolved = Vec::new();
            for &requested in requested_columns {
                if let Some(actual) = table_map.get(requested) {
                    resolved.push(actual.clone());
                } else {
                    // Column not in cache, need to refresh
                    drop(cache);
                    return refresh_and_resolve(pool, table_name, requested_columns).await;
                }
            }
            return Ok(resolved);
        }
    }

    // Cache miss, fetch and build cache
    refresh_and_resolve(pool, table_name, requested_columns).await
}

/// Refreshes the cache for a table and resolves columns
async fn refresh_and_resolve(
    pool: &PgPool,
    table_name: &str,
    requested_columns: &[&str],
) -> Result<Vec<String>> {
    // Fetch all columns for this table
    let available_columns = fetch_table_columns(pool, table_name).await?;

    if available_columns.is_empty() {
        return Err(anyhow!(
            "Table '{}' not found or has no columns",
            table_name
        ));
    }

    // Build mapping for all requested columns
    let mut table_map: HashMap<String, String> = HashMap::new();
    let mut resolved = Vec::new();

    for &requested in requested_columns {
        if let Some(actual) = find_matching_column(requested, &available_columns) {
            table_map.insert(requested.to_string(), actual.clone());
            resolved.push(actual);
        } else {
            warn!(
                "Column '{}' not found in table '{}'. Available columns: {:?}",
                requested, table_name, available_columns
            );
            return Err(anyhow!(
                "Column '{}' does not exist in table '{}'. Available columns: {:?}",
                requested,
                table_name,
                available_columns
            ));
        }
    }

    // Update cache
    {
        let mut cache = COLUMN_CACHE.write().await;
        cache.insert(table_name.to_string(), table_map);
    }

    Ok(resolved)
}

/// Clears the column cache for a specific table or all tables
pub async fn clear_cache(table_name: Option<&str>) {
    let mut cache = COLUMN_CACHE.write().await;
    match table_name {
        Some(name) => {
            cache.remove(name);
            debug!("Cleared column cache for table '{}'", name);
        }
        None => {
            cache.clear();
            debug!("Cleared all column caches");
        }
    }
}