Skip to main content

athena_gateway/
postgrest.rs

1//! Portable PostgREST compatibility parsing for Athena's legacy REST shim.
2//!
3//! The parser operates on plain query-string and header values so the grammar
4//! can be reused without pulling `actix_web::HttpRequest` into this crate.
5
6use athena_query::postgres_types::where_cast_for_column;
7use athena_query::query_builder::{Condition, ConditionOperator};
8use serde_json::Value;
9use serde_urlencoded::from_str;
10use std::collections::HashMap;
11
12use crate::normalize_column_name;
13
14/// Normalized PostgREST query components derived from a request URL.
15#[derive(Debug, Clone, PartialEq)]
16pub struct PostgrestQuery {
17    pub columns: Vec<String>,
18    pub filters: Vec<PostgrestFilter>,
19    pub or_filters: Vec<Vec<PostgrestFilter>>,
20    pub limit: Option<i64>,
21    pub offset: Option<i64>,
22    pub order: Option<OrderSpec>,
23}
24
25/// One parsed PostgREST filter expression.
26#[derive(Debug, Clone, PartialEq)]
27pub struct PostgrestFilter {
28    pub column: String,
29    pub operator: PostgrestFilterOperator,
30    pub values: Vec<Value>,
31    pub negated: bool,
32}
33
34/// PostgREST filter operators supported by the legacy REST shim.
35#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum PostgrestFilterOperator {
37    Eq,
38    Neq,
39    Gt,
40    Lt,
41    Gte,
42    Lte,
43    Like,
44    ILike,
45    Is,
46    In,
47    Contains,
48    Contained,
49}
50
51/// Parsed `order` query parameter.
52#[derive(Debug, Clone, PartialEq, Eq)]
53pub struct OrderSpec {
54    pub column: String,
55    pub ascending: bool,
56}
57
58/// Parses a Supabase/PostgREST query into a normalized representation that can
59/// be mapped to Athena/Postgres operations.
60pub fn parse_postgrest_query(
61    query_string: &str,
62    range_header: Option<&str>,
63    force_snake_case: bool,
64) -> Result<PostgrestQuery, String> {
65    let mut columns: Vec<String> = vec!["*".to_string()];
66    let mut filters: Vec<PostgrestFilter> = Vec::new();
67    let mut or_filters: Vec<Vec<PostgrestFilter>> = Vec::new();
68    let mut limit: Option<i64> = None;
69    let mut offset: Option<i64> = None;
70    let mut order: Option<OrderSpec> = None;
71
72    let query_pairs: Vec<(String, String)> = from_str::<Vec<(String, String)>>(query_string)
73        .map_err(|err| format!("failed to parse query string: {err}"))?;
74
75    for (key, value) in query_pairs {
76        match key.as_str() {
77            "select" => {
78                let parsed: Vec<String> = value
79                    .split(',')
80                    .map(|part| part.trim().to_string())
81                    .filter(|part| !part.is_empty())
82                    .collect();
83                if !parsed.is_empty() {
84                    columns = parsed;
85                }
86            }
87            "limit" => {
88                if let Ok(parsed) = value.parse::<i64>() {
89                    limit = Some(parsed);
90                }
91            }
92            "offset" => {
93                if let Ok(parsed) = value.parse::<i64>() {
94                    offset = Some(parsed);
95                }
96            }
97            "order" => {
98                if let Some(spec) = parse_order(&value, force_snake_case) {
99                    order = Some(spec);
100                }
101            }
102            "or" => {
103                if let Some(parsed) = parse_or_filter(&value, force_snake_case)
104                    && !parsed.is_empty()
105                {
106                    or_filters.push(parsed);
107                }
108            }
109            other => {
110                if let Some(filter) = parse_filter(other, &value, force_snake_case) {
111                    filters.push(filter);
112                }
113            }
114        }
115    }
116
117    if let Some((start, end)) = range_header.and_then(parse_range_header_value)
118        && end >= start
119    {
120        offset = Some(start);
121        limit = Some(end - start + 1);
122    }
123
124    Ok(PostgrestQuery {
125        columns,
126        filters,
127        or_filters,
128        limit,
129        offset,
130        order,
131    })
132}
133
134/// Converts parsed PostgREST filters into Athena query-builder conditions.
135pub fn convert_postgrest_filters_to_conditions(
136    filters: &[PostgrestFilter],
137    auto_cast_uuid_filter_values_to_text: bool,
138    column_types: Option<&HashMap<String, String>>,
139) -> Vec<Condition> {
140    filters
141        .iter()
142        .filter_map(|filter| {
143            convert_postgrest_filter_to_condition(
144                filter,
145                auto_cast_uuid_filter_values_to_text,
146                column_types,
147            )
148        })
149        .collect()
150}
151
152/// Converts OR filter groups into Athena query-builder condition groups.
153pub fn convert_postgrest_or_filter_groups_to_conditions(
154    or_groups: &[Vec<PostgrestFilter>],
155    auto_cast_uuid_filter_values_to_text: bool,
156    column_types: Option<&HashMap<String, String>>,
157) -> Vec<Vec<Condition>> {
158    or_groups
159        .iter()
160        .map(|group| {
161            convert_postgrest_filters_to_conditions(
162                group,
163                auto_cast_uuid_filter_values_to_text,
164                column_types,
165            )
166        })
167        .filter(|group| !group.is_empty())
168        .collect()
169}
170
171/// Converts one parsed PostgREST filter into an Athena query condition.
172pub fn convert_postgrest_filter_to_condition(
173    filter: &PostgrestFilter,
174    auto_cast_uuid_filter_values_to_text: bool,
175    column_types: Option<&HashMap<String, String>>,
176) -> Option<Condition> {
177    let cast: Option<&'static str> = where_cast_for_column(&filter.column, column_types);
178    Some(
179        Condition::new(
180            filter.column.clone(),
181            map_filter_operator(filter.operator),
182            filter.values.clone(),
183            filter.negated,
184        )
185        .with_uuid_value_text_cast(auto_cast_uuid_filter_values_to_text)
186        .with_pg_cast(cast),
187    )
188}
189
190fn map_filter_operator(op: PostgrestFilterOperator) -> ConditionOperator {
191    match op {
192        PostgrestFilterOperator::Eq => ConditionOperator::Eq,
193        PostgrestFilterOperator::Neq => ConditionOperator::Neq,
194        PostgrestFilterOperator::Gt => ConditionOperator::Gt,
195        PostgrestFilterOperator::Lt => ConditionOperator::Lt,
196        PostgrestFilterOperator::Gte => ConditionOperator::Gte,
197        PostgrestFilterOperator::Lte => ConditionOperator::Lte,
198        PostgrestFilterOperator::Like => ConditionOperator::Like,
199        PostgrestFilterOperator::ILike => ConditionOperator::ILike,
200        PostgrestFilterOperator::Is => ConditionOperator::Is,
201        PostgrestFilterOperator::In => ConditionOperator::In,
202        PostgrestFilterOperator::Contains => ConditionOperator::Contains,
203        PostgrestFilterOperator::Contained => ConditionOperator::Contained,
204    }
205}
206
207fn parse_order(value: &str, force_snake_case: bool) -> Option<OrderSpec> {
208    let trimmed = value.trim();
209    let (raw_column, ascending) = if let Some(column) = trimmed.strip_suffix(".asc") {
210        (column, true)
211    } else if let Some(column) = trimmed.strip_suffix(".desc") {
212        (column, false)
213    } else {
214        (trimmed, true)
215    };
216    let column: String = if force_snake_case {
217        normalize_column_name(raw_column, true)
218    } else {
219        raw_column.to_ascii_lowercase()
220    };
221    if column.is_empty() {
222        return None;
223    }
224    Some(OrderSpec { column, ascending })
225}
226
227fn parse_or_filter(value: &str, force_snake_case: bool) -> Option<Vec<PostgrestFilter>> {
228    let trimmed: &str = value.trim();
229    let inner: &str = trimmed.trim_start_matches('(').trim_end_matches(')').trim();
230
231    if inner.is_empty() {
232        return None;
233    }
234
235    let mut filters: Vec<PostgrestFilter> = Vec::new();
236    for expression in inner.split(',') {
237        let expression: &str = expression.trim();
238        if expression.is_empty() {
239            continue;
240        }
241        if let Some((column, _remainder)) = expression.split_once('.')
242            && let Some(filter) = parse_filter(
243                column,
244                expression
245                    .strip_prefix(&format!("{}.", column))
246                    .unwrap_or(""),
247                force_snake_case,
248            )
249        {
250            filters.push(filter);
251        }
252    }
253
254    if filters.is_empty() {
255        None
256    } else {
257        Some(filters)
258    }
259}
260
261fn parse_filter(column: &str, expression: &str, force_snake_case: bool) -> Option<PostgrestFilter> {
262    let normalized_column: String = if force_snake_case {
263        normalize_column_name(column, true)
264    } else {
265        column.to_string()
266    };
267
268    if normalized_column.is_empty() {
269        return None;
270    }
271
272    let (negated, expr) = if let Some(stripped) = expression.strip_prefix("not.") {
273        (true, stripped)
274    } else {
275        (false, expression)
276    };
277
278    let (operator_str, value_str) = if let Some((op, rest)) = expr.split_once('.') {
279        (op, rest)
280    } else {
281        return None;
282    };
283
284    let operator: PostgrestFilterOperator = match operator_str.to_lowercase().as_str() {
285        "eq" => PostgrestFilterOperator::Eq,
286        "neq" => PostgrestFilterOperator::Neq,
287        "gt" => PostgrestFilterOperator::Gt,
288        "lt" => PostgrestFilterOperator::Lt,
289        "gte" => PostgrestFilterOperator::Gte,
290        "lte" => PostgrestFilterOperator::Lte,
291        "like" => PostgrestFilterOperator::Like,
292        "ilike" => PostgrestFilterOperator::ILike,
293        "is" => PostgrestFilterOperator::Is,
294        "in" => PostgrestFilterOperator::In,
295        "cs" => PostgrestFilterOperator::Contains,
296        "cd" => PostgrestFilterOperator::Contained,
297        other => {
298            if let Some(stripped) = other.strip_prefix("array_") {
299                match stripped {
300                    "contains" => PostgrestFilterOperator::Contains,
301                    "contained" => PostgrestFilterOperator::Contained,
302                    _ => PostgrestFilterOperator::Eq,
303                }
304            } else {
305                PostgrestFilterOperator::Eq
306            }
307        }
308    };
309
310    let values: Vec<Value> = match operator {
311        PostgrestFilterOperator::In => parse_in_values(value_str),
312        PostgrestFilterOperator::Contains | PostgrestFilterOperator::Contained => {
313            vec![parse_array_filter(value_str)]
314        }
315        PostgrestFilterOperator::Is => vec![parse_scalar_value(value_str)],
316        _ => vec![parse_scalar_value(value_str)],
317    };
318
319    Some(PostgrestFilter {
320        column: normalized_column,
321        operator,
322        values,
323        negated,
324    })
325}
326
327fn parse_in_values(value: &str) -> Vec<Value> {
328    let trimmed: &str = value.trim().trim_start_matches('(').trim_end_matches(')');
329    trimmed
330        .split(',')
331        .map(|part| parse_scalar_value(part.trim()))
332        .collect()
333}
334
335fn parse_array_filter(value: &str) -> Value {
336    let trimmed: &str = value.trim().trim_start_matches('.').trim();
337    let inner: &str = trimmed.trim_start_matches('{').trim_end_matches('}').trim();
338    let elements: Vec<Value> = inner
339        .split(',')
340        .map(|part| parse_scalar_value(part.trim()))
341        .collect();
342    Value::Array(elements)
343}
344
345fn parse_scalar_value(value: &str) -> Value {
346    let lowered: String = value.to_lowercase();
347    if lowered.is_empty() {
348        return Value::String(String::new());
349    }
350
351    if lowered == "null" {
352        return Value::Null;
353    }
354
355    if lowered == "true" {
356        return Value::Bool(true);
357    }
358
359    if lowered == "false" {
360        return Value::Bool(false);
361    }
362
363    if let Ok(int_value) = value.parse::<i64>() {
364        return Value::Number(int_value.into());
365    }
366
367    if let Ok(float_value) = value.parse::<f64>()
368        && let Some(number) = serde_json::Number::from_f64(float_value)
369    {
370        return Value::Number(number);
371    }
372
373    Value::String(value.replace('*', "%"))
374}
375
376fn parse_range_header_value(header_value: &str) -> Option<(i64, i64)> {
377    let cleaned: &str = header_value.trim().trim_start_matches("items=");
378    let mut parts = cleaned.split('-');
379    let start: i64 = parts.next()?.trim().parse::<i64>().ok()?;
380    let end: i64 = parts.next()?.trim().parse::<i64>().ok()?;
381    Some((start, end))
382}
383
384#[cfg(test)]
385mod tests {
386    use super::*;
387
388    #[test]
389    fn parse_postgrest_query_parses_select_filters_order_and_range() {
390        let parsed = parse_postgrest_query(
391            "select=id,name&active=eq.true&score=gte.42&order=created_at.desc",
392            Some("items=10-24"),
393            false,
394        )
395        .expect("query");
396
397        assert_eq!(parsed.columns, vec!["id", "name"]);
398        assert_eq!(parsed.limit, Some(15));
399        assert_eq!(parsed.offset, Some(10));
400        assert_eq!(
401            parsed.order,
402            Some(OrderSpec {
403                column: "created_at".to_string(),
404                ascending: false,
405            })
406        );
407        assert_eq!(parsed.filters.len(), 2);
408        assert_eq!(parsed.filters[0].column, "active");
409        assert_eq!(parsed.filters[1].values[0], Value::Number(42.into()));
410    }
411
412    #[test]
413    fn parse_postgrest_query_normalizes_columns_when_forced() {
414        let parsed = parse_postgrest_query("organizationId=eq.123&order=createdAt.asc", None, true)
415            .expect("query");
416
417        assert_eq!(parsed.filters[0].column, "organization_id");
418        assert_eq!(
419            parsed.order,
420            Some(OrderSpec {
421                column: "created_at".to_string(),
422                ascending: true,
423            })
424        );
425    }
426
427    #[test]
428    fn parse_postgrest_query_supports_or_and_array_filters() {
429        let parsed = parse_postgrest_query(
430            "or=(status.eq.active,status.eq.pending)&tags=cs.{alpha,beta}",
431            None,
432            false,
433        )
434        .expect("query");
435
436        assert_eq!(parsed.or_filters.len(), 1);
437        assert_eq!(parsed.or_filters[0].len(), 2);
438        assert_eq!(
439            parsed.filters[0].operator,
440            PostgrestFilterOperator::Contains
441        );
442        assert_eq!(
443            parsed.filters[0].values,
444            vec![Value::Array(vec![
445                Value::String("alpha".to_string()),
446                Value::String("beta".to_string()),
447            ])]
448        );
449    }
450
451    #[test]
452    fn convert_postgrest_filters_to_conditions_preserves_operator_and_pg_cast() {
453        let filters = vec![PostgrestFilter {
454            column: "cache_hit_ratio".to_string(),
455            operator: PostgrestFilterOperator::Gte,
456            values: vec![Value::String("0.75".to_string())],
457            negated: false,
458        }];
459        let column_types = HashMap::from([(
460            "cache_hit_ratio".to_string(),
461            "double precision|float8".to_string(),
462        )]);
463
464        let conditions =
465            convert_postgrest_filters_to_conditions(&filters, true, Some(&column_types));
466
467        assert_eq!(conditions.len(), 1);
468        assert_eq!(conditions[0].operator, ConditionOperator::Gte);
469        assert_eq!(conditions[0].pg_cast.as_deref(), Some("float8"));
470        assert!(conditions[0].auto_cast_uuid_value_to_text);
471    }
472}