athena-gateway 3.18.0

Portable gateway request contracts and normalization primitives for Athena
Documentation
//! Legacy `GET /data` compatibility planning for gateway fetch reads.
//!
//! This module owns the request-domain translation from query parameters into
//! the internal `/gateway/fetch` JSON body plus the associated cache-key
//! material. Runtime adapters keep auth, cache IO, and HTTP response handling.

use serde_json::{Value, json};
use std::collections::HashMap;

use crate::normalize_column_name;

fn normalize_columns_csv(columns: &str, force: bool) -> String {
    if !force {
        return columns.to_string();
    }

    columns
        .split(',')
        .map(str::trim)
        .filter(|value| !value.is_empty())
        .map(|value| normalize_column_name(value, true))
        .collect::<Vec<_>>()
        .join(",")
}

fn parse_optional_i32(query: &HashMap<String, String>, key: &str) -> Option<i32> {
    query.get(key).and_then(|value| value.parse::<i32>().ok())
}

/// Query-parameter-derived plan for the legacy `GET /data` compatibility route.
#[derive(Debug, Clone)]
pub struct GatewayGetFetchCompatibilityPlan {
    /// View/table requested by the compatibility route.
    pub view: String,
    /// Internal fetch JSON body delegated to the POST handler.
    pub request_body: Value,
    /// Legacy unhashed cache key preserved for compatibility lookup.
    pub legacy_cache_key: String,
    /// Primary hashed cache key used by the modern GET compatibility path.
    pub hashed_cache_key: String,
    /// Legacy short-hash cache key still probed for compatibility.
    pub legacy_hashed_cache_key: String,
}

/// Validation errors for legacy `GET /data` query parsing.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum GatewayGetFetchCompatibilityError {
    MissingView,
    MissingEqColumn,
    MissingEqValue,
}

impl GatewayGetFetchCompatibilityError {
    /// Returns the public parameter name associated with this validation error.
    pub fn parameter_name(self) -> &'static str {
        match self {
            Self::MissingView => "view",
            Self::MissingEqColumn => "eq_column",
            Self::MissingEqValue => "eq_value",
        }
    }
}

/// Builds the compatibility fetch plan for `GET /data`.
pub fn build_gateway_get_fetch_compatibility_plan(
    query: &HashMap<String, String>,
    client_name: &str,
    force_camel_case_to_snake_case: bool,
) -> Result<GatewayGetFetchCompatibilityPlan, GatewayGetFetchCompatibilityError> {
    let view = query
        .get("view")
        .cloned()
        .ok_or(GatewayGetFetchCompatibilityError::MissingView)?;
    let eq_column = query
        .get("eq_column")
        .cloned()
        .ok_or(GatewayGetFetchCompatibilityError::MissingEqColumn)?;
    let eq_value = query
        .get("eq_value")
        .cloned()
        .ok_or(GatewayGetFetchCompatibilityError::MissingEqValue)?;

    let columns = query.get("columns").cloned();
    let limit = parse_optional_i32(query, "limit");
    let current_page = parse_optional_i32(query, "current_page");
    let page_size = parse_optional_i32(query, "page_size");
    let offset = parse_optional_i32(query, "offset");
    let total_pages = parse_optional_i32(query, "total_pages");
    let strip_nulls = query
        .get("strip_nulls")
        .and_then(|value| value.parse::<bool>().ok())
        .unwrap_or(false);
    let sort_by = query.get("sort_by").cloned();
    let sort_direction = query.get("sort_direction").cloned();

    let normalized_columns_param = columns
        .as_ref()
        .map(|cols| normalize_columns_csv(cols, force_camel_case_to_snake_case));

    let legacy_cache_key = format!(
        "get_data_route:{}:{}:{}:{}:{}:{}:{}:{}:{}:{}:{}:{}:{}",
        view,
        eq_column,
        eq_value,
        normalized_columns_param.as_deref().unwrap_or(""),
        limit.unwrap_or(0),
        current_page.unwrap_or(1),
        page_size.unwrap_or(100),
        offset.unwrap_or(0),
        total_pages.unwrap_or(1),
        strip_nulls,
        sort_by.as_deref().unwrap_or(""),
        sort_direction.as_deref().unwrap_or(""),
        client_name
    );

    let get_hash_input = json!({
        "view": view,
        "eq_column": eq_column,
        "eq_value": eq_value,
        "columns": normalized_columns_param.clone(),
        "limit": limit,
        "current_page": current_page,
        "page_size": page_size,
        "offset": offset,
        "total_pages": total_pages,
        "strip_nulls": strip_nulls,
        "sort_by": sort_by,
        "sort_direction": sort_direction,
        "client": client_name,
    });
    let get_hash_str = sha256::digest(serde_json::to_string(&get_hash_input).unwrap_or_default());
    let hashed_cache_key = format!(
        "{}-h{}",
        legacy_cache_key,
        &get_hash_str[..16.min(get_hash_str.len())]
    );
    let legacy_hashed_cache_key = format!("{}-h{}", legacy_cache_key, &get_hash_str[..8]);

    let condition = json!({
        "eq_column": eq_column,
        "eq_value": eq_value
    });

    let mut request_body = json!({
        "table_name": view,
        "conditions": [condition]
    });

    if let Some(cols) = normalized_columns_param {
        request_body["columns"] = json!(cols);
    }
    if let Some(value) = limit {
        request_body["limit"] = json!(value);
    }
    if let Some(value) = current_page {
        request_body["current_page"] = json!(value);
    }
    if let Some(value) = page_size {
        request_body["page_size"] = json!(value);
    }
    if let Some(value) = offset {
        request_body["offset"] = json!(value);
    }
    if let Some(value) = total_pages {
        request_body["total_pages"] = json!(value);
    }
    if strip_nulls {
        request_body["strip_nulls"] = json!(true);
    }
    if let Some(ref column) = sort_by
        && !column.is_empty()
    {
        let direction = match sort_direction
            .as_deref()
            .unwrap_or("asc")
            .to_ascii_lowercase()
            .as_str()
        {
            "desc" | "descending" => "desc",
            _ => "asc",
        };
        request_body["sortBy"] = json!({ "field": column, "direction": direction });
    }

    Ok(GatewayGetFetchCompatibilityPlan {
        view,
        request_body,
        legacy_cache_key,
        hashed_cache_key,
        legacy_hashed_cache_key,
    })
}

#[cfg(test)]
mod tests {
    use super::{GatewayGetFetchCompatibilityError, build_gateway_get_fetch_compatibility_plan};
    use serde_json::json;
    use std::collections::HashMap;

    fn query_map(entries: &[(&str, &str)]) -> HashMap<String, String> {
        entries
            .iter()
            .map(|(key, value)| ((*key).to_string(), (*value).to_string()))
            .collect()
    }

    #[test]
    fn compatibility_plan_requires_view() {
        let err = build_gateway_get_fetch_compatibility_plan(&HashMap::new(), "reporting", false)
            .expect_err("missing view should fail");
        assert_eq!(err, GatewayGetFetchCompatibilityError::MissingView);
        assert_eq!(err.parameter_name(), "view");
    }

    #[test]
    fn compatibility_plan_requires_eq_column() {
        let query = query_map(&[("view", "users")]);
        let err = build_gateway_get_fetch_compatibility_plan(&query, "reporting", false)
            .expect_err("missing eq_column should fail");
        assert_eq!(err, GatewayGetFetchCompatibilityError::MissingEqColumn);
    }

    #[test]
    fn compatibility_plan_builds_request_body_and_cache_keys() {
        let query = query_map(&[
            ("view", "users"),
            ("eq_column", "workspaceId"),
            ("eq_value", "wk_123"),
            ("columns", "id,displayName"),
            ("limit", "25"),
            ("current_page", "3"),
            ("page_size", "50"),
            ("offset", "4"),
            ("total_pages", "9"),
            ("strip_nulls", "true"),
            ("sort_by", "displayName"),
            ("sort_direction", "descending"),
        ]);

        let plan = build_gateway_get_fetch_compatibility_plan(&query, "reporting", true)
            .expect("plan should parse");

        assert_eq!(plan.view, "users");
        assert_eq!(
            plan.request_body,
            json!({
                "table_name": "users",
                "conditions": [
                    { "eq_column": "workspaceId", "eq_value": "wk_123" }
                ],
                "columns": "id,display_name",
                "limit": 25,
                "current_page": 3,
                "page_size": 50,
                "offset": 4,
                "total_pages": 9,
                "strip_nulls": true,
                "sortBy": { "field": "displayName", "direction": "desc" }
            })
        );
        assert_eq!(
            plan.legacy_cache_key,
            "get_data_route:users:workspaceId:wk_123:id,display_name:25:3:50:4:9:true:displayName:descending:reporting"
        );
        assert!(plan.hashed_cache_key.starts_with(&plan.legacy_cache_key));
        assert!(
            plan.legacy_hashed_cache_key
                .starts_with(&plan.legacy_cache_key)
        );
        assert_ne!(plan.hashed_cache_key, plan.legacy_hashed_cache_key);
    }

    #[test]
    fn compatibility_plan_omits_empty_sort_body() {
        let query = query_map(&[
            ("view", "users"),
            ("eq_column", "id"),
            ("eq_value", "1"),
            ("sort_by", ""),
        ]);

        let plan = build_gateway_get_fetch_compatibility_plan(&query, "reporting", false)
            .expect("plan should parse");

        assert!(plan.request_body.get("sortBy").is_none());
    }
}