Skip to main content

athena_gateway/
fetch_get_compat.rs

1//! Legacy `GET /data` compatibility planning for gateway fetch reads.
2//!
3//! This module owns the request-domain translation from query parameters into
4//! the internal `/gateway/fetch` JSON body plus the associated cache-key
5//! material. Runtime adapters keep auth, cache IO, and HTTP response handling.
6
7use serde_json::{Value, json};
8use std::collections::HashMap;
9
10use crate::normalize_column_name;
11
12fn normalize_columns_csv(columns: &str, force: bool) -> String {
13    if !force {
14        return columns.to_string();
15    }
16
17    columns
18        .split(',')
19        .map(str::trim)
20        .filter(|value| !value.is_empty())
21        .map(|value| normalize_column_name(value, true))
22        .collect::<Vec<_>>()
23        .join(",")
24}
25
26fn parse_optional_i32(query: &HashMap<String, String>, key: &str) -> Option<i32> {
27    query.get(key).and_then(|value| value.parse::<i32>().ok())
28}
29
30/// Query-parameter-derived plan for the legacy `GET /data` compatibility route.
31#[derive(Debug, Clone)]
32pub struct GatewayGetFetchCompatibilityPlan {
33    /// View/table requested by the compatibility route.
34    pub view: String,
35    /// Internal fetch JSON body delegated to the POST handler.
36    pub request_body: Value,
37    /// Legacy unhashed cache key preserved for compatibility lookup.
38    pub legacy_cache_key: String,
39    /// Primary hashed cache key used by the modern GET compatibility path.
40    pub hashed_cache_key: String,
41    /// Legacy short-hash cache key still probed for compatibility.
42    pub legacy_hashed_cache_key: String,
43}
44
45/// Validation errors for legacy `GET /data` query parsing.
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47pub enum GatewayGetFetchCompatibilityError {
48    MissingView,
49    MissingEqColumn,
50    MissingEqValue,
51}
52
53impl GatewayGetFetchCompatibilityError {
54    /// Returns the public parameter name associated with this validation error.
55    pub fn parameter_name(self) -> &'static str {
56        match self {
57            Self::MissingView => "view",
58            Self::MissingEqColumn => "eq_column",
59            Self::MissingEqValue => "eq_value",
60        }
61    }
62}
63
64/// Builds the compatibility fetch plan for `GET /data`.
65pub fn build_gateway_get_fetch_compatibility_plan(
66    query: &HashMap<String, String>,
67    client_name: &str,
68    force_camel_case_to_snake_case: bool,
69) -> Result<GatewayGetFetchCompatibilityPlan, GatewayGetFetchCompatibilityError> {
70    let view = query
71        .get("view")
72        .cloned()
73        .ok_or(GatewayGetFetchCompatibilityError::MissingView)?;
74    let eq_column = query
75        .get("eq_column")
76        .cloned()
77        .ok_or(GatewayGetFetchCompatibilityError::MissingEqColumn)?;
78    let eq_value = query
79        .get("eq_value")
80        .cloned()
81        .ok_or(GatewayGetFetchCompatibilityError::MissingEqValue)?;
82
83    let columns = query.get("columns").cloned();
84    let limit = parse_optional_i32(query, "limit");
85    let current_page = parse_optional_i32(query, "current_page");
86    let page_size = parse_optional_i32(query, "page_size");
87    let offset = parse_optional_i32(query, "offset");
88    let total_pages = parse_optional_i32(query, "total_pages");
89    let strip_nulls = query
90        .get("strip_nulls")
91        .and_then(|value| value.parse::<bool>().ok())
92        .unwrap_or(false);
93    let sort_by = query.get("sort_by").cloned();
94    let sort_direction = query.get("sort_direction").cloned();
95
96    let normalized_columns_param = columns
97        .as_ref()
98        .map(|cols| normalize_columns_csv(cols, force_camel_case_to_snake_case));
99
100    let legacy_cache_key = format!(
101        "get_data_route:{}:{}:{}:{}:{}:{}:{}:{}:{}:{}:{}:{}:{}",
102        view,
103        eq_column,
104        eq_value,
105        normalized_columns_param.as_deref().unwrap_or(""),
106        limit.unwrap_or(0),
107        current_page.unwrap_or(1),
108        page_size.unwrap_or(100),
109        offset.unwrap_or(0),
110        total_pages.unwrap_or(1),
111        strip_nulls,
112        sort_by.as_deref().unwrap_or(""),
113        sort_direction.as_deref().unwrap_or(""),
114        client_name
115    );
116
117    let get_hash_input = json!({
118        "view": view,
119        "eq_column": eq_column,
120        "eq_value": eq_value,
121        "columns": normalized_columns_param.clone(),
122        "limit": limit,
123        "current_page": current_page,
124        "page_size": page_size,
125        "offset": offset,
126        "total_pages": total_pages,
127        "strip_nulls": strip_nulls,
128        "sort_by": sort_by,
129        "sort_direction": sort_direction,
130        "client": client_name,
131    });
132    let get_hash_str = sha256::digest(serde_json::to_string(&get_hash_input).unwrap_or_default());
133    let hashed_cache_key = format!(
134        "{}-h{}",
135        legacy_cache_key,
136        &get_hash_str[..16.min(get_hash_str.len())]
137    );
138    let legacy_hashed_cache_key = format!("{}-h{}", legacy_cache_key, &get_hash_str[..8]);
139
140    let condition = json!({
141        "eq_column": eq_column,
142        "eq_value": eq_value
143    });
144
145    let mut request_body = json!({
146        "table_name": view,
147        "conditions": [condition]
148    });
149
150    if let Some(cols) = normalized_columns_param {
151        request_body["columns"] = json!(cols);
152    }
153    if let Some(value) = limit {
154        request_body["limit"] = json!(value);
155    }
156    if let Some(value) = current_page {
157        request_body["current_page"] = json!(value);
158    }
159    if let Some(value) = page_size {
160        request_body["page_size"] = json!(value);
161    }
162    if let Some(value) = offset {
163        request_body["offset"] = json!(value);
164    }
165    if let Some(value) = total_pages {
166        request_body["total_pages"] = json!(value);
167    }
168    if strip_nulls {
169        request_body["strip_nulls"] = json!(true);
170    }
171    if let Some(ref column) = sort_by
172        && !column.is_empty()
173    {
174        let direction = match sort_direction
175            .as_deref()
176            .unwrap_or("asc")
177            .to_ascii_lowercase()
178            .as_str()
179        {
180            "desc" | "descending" => "desc",
181            _ => "asc",
182        };
183        request_body["sortBy"] = json!({ "field": column, "direction": direction });
184    }
185
186    Ok(GatewayGetFetchCompatibilityPlan {
187        view,
188        request_body,
189        legacy_cache_key,
190        hashed_cache_key,
191        legacy_hashed_cache_key,
192    })
193}
194
195#[cfg(test)]
196mod tests {
197    use super::{GatewayGetFetchCompatibilityError, build_gateway_get_fetch_compatibility_plan};
198    use serde_json::json;
199    use std::collections::HashMap;
200
201    fn query_map(entries: &[(&str, &str)]) -> HashMap<String, String> {
202        entries
203            .iter()
204            .map(|(key, value)| ((*key).to_string(), (*value).to_string()))
205            .collect()
206    }
207
208    #[test]
209    fn compatibility_plan_requires_view() {
210        let err = build_gateway_get_fetch_compatibility_plan(&HashMap::new(), "reporting", false)
211            .expect_err("missing view should fail");
212        assert_eq!(err, GatewayGetFetchCompatibilityError::MissingView);
213        assert_eq!(err.parameter_name(), "view");
214    }
215
216    #[test]
217    fn compatibility_plan_requires_eq_column() {
218        let query = query_map(&[("view", "users")]);
219        let err = build_gateway_get_fetch_compatibility_plan(&query, "reporting", false)
220            .expect_err("missing eq_column should fail");
221        assert_eq!(err, GatewayGetFetchCompatibilityError::MissingEqColumn);
222    }
223
224    #[test]
225    fn compatibility_plan_builds_request_body_and_cache_keys() {
226        let query = query_map(&[
227            ("view", "users"),
228            ("eq_column", "workspaceId"),
229            ("eq_value", "wk_123"),
230            ("columns", "id,displayName"),
231            ("limit", "25"),
232            ("current_page", "3"),
233            ("page_size", "50"),
234            ("offset", "4"),
235            ("total_pages", "9"),
236            ("strip_nulls", "true"),
237            ("sort_by", "displayName"),
238            ("sort_direction", "descending"),
239        ]);
240
241        let plan = build_gateway_get_fetch_compatibility_plan(&query, "reporting", true)
242            .expect("plan should parse");
243
244        assert_eq!(plan.view, "users");
245        assert_eq!(
246            plan.request_body,
247            json!({
248                "table_name": "users",
249                "conditions": [
250                    { "eq_column": "workspaceId", "eq_value": "wk_123" }
251                ],
252                "columns": "id,display_name",
253                "limit": 25,
254                "current_page": 3,
255                "page_size": 50,
256                "offset": 4,
257                "total_pages": 9,
258                "strip_nulls": true,
259                "sortBy": { "field": "displayName", "direction": "desc" }
260            })
261        );
262        assert_eq!(
263            plan.legacy_cache_key,
264            "get_data_route:users:workspaceId:wk_123:id,display_name:25:3:50:4:9:true:displayName:descending:reporting"
265        );
266        assert!(plan.hashed_cache_key.starts_with(&plan.legacy_cache_key));
267        assert!(
268            plan.legacy_hashed_cache_key
269                .starts_with(&plan.legacy_cache_key)
270        );
271        assert_ne!(plan.hashed_cache_key, plan.legacy_hashed_cache_key);
272    }
273
274    #[test]
275    fn compatibility_plan_omits_empty_sort_body() {
276        let query = query_map(&[
277            ("view", "users"),
278            ("eq_column", "id"),
279            ("eq_value", "1"),
280            ("sort_by", ""),
281        ]);
282
283        let plan = build_gateway_get_fetch_compatibility_plan(&query, "reporting", false)
284            .expect("plan should parse");
285
286        assert!(plan.request_body.get("sortBy").is_none());
287    }
288}