1use athena_query::postgres_types::where_cast_for_column;
7use athena_query::query_builder::{Condition, ConditionOperator};
8use serde_json::Value;
9use serde_urlencoded::from_str;
10use std::collections::HashMap;
11
12use crate::normalize_column_name;
13
14#[derive(Debug, Clone, PartialEq)]
16pub struct PostgrestQuery {
17 pub columns: Vec<String>,
18 pub filters: Vec<PostgrestFilter>,
19 pub or_filters: Vec<Vec<PostgrestFilter>>,
20 pub limit: Option<i64>,
21 pub offset: Option<i64>,
22 pub order: Option<OrderSpec>,
23}
24
25#[derive(Debug, Clone, PartialEq)]
27pub struct PostgrestFilter {
28 pub column: String,
29 pub operator: PostgrestFilterOperator,
30 pub values: Vec<Value>,
31 pub negated: bool,
32}
33
34#[derive(Debug, Clone, Copy, PartialEq, Eq)]
36pub enum PostgrestFilterOperator {
37 Eq,
38 Neq,
39 Gt,
40 Lt,
41 Gte,
42 Lte,
43 Like,
44 ILike,
45 Is,
46 In,
47 Contains,
48 Contained,
49}
50
51#[derive(Debug, Clone, PartialEq, Eq)]
53pub struct OrderSpec {
54 pub column: String,
55 pub ascending: bool,
56}
57
58pub fn parse_postgrest_query(
61 query_string: &str,
62 range_header: Option<&str>,
63 force_snake_case: bool,
64) -> Result<PostgrestQuery, String> {
65 let mut columns: Vec<String> = vec!["*".to_string()];
66 let mut filters: Vec<PostgrestFilter> = Vec::new();
67 let mut or_filters: Vec<Vec<PostgrestFilter>> = Vec::new();
68 let mut limit: Option<i64> = None;
69 let mut offset: Option<i64> = None;
70 let mut order: Option<OrderSpec> = None;
71
72 let query_pairs: Vec<(String, String)> = from_str::<Vec<(String, String)>>(query_string)
73 .map_err(|err| format!("failed to parse query string: {err}"))?;
74
75 for (key, value) in query_pairs {
76 match key.as_str() {
77 "select" => {
78 let parsed: Vec<String> = value
79 .split(',')
80 .map(|part| part.trim().to_string())
81 .filter(|part| !part.is_empty())
82 .collect();
83 if !parsed.is_empty() {
84 columns = parsed;
85 }
86 }
87 "limit" => {
88 if let Ok(parsed) = value.parse::<i64>() {
89 limit = Some(parsed);
90 }
91 }
92 "offset" => {
93 if let Ok(parsed) = value.parse::<i64>() {
94 offset = Some(parsed);
95 }
96 }
97 "order" => {
98 if let Some(spec) = parse_order(&value, force_snake_case) {
99 order = Some(spec);
100 }
101 }
102 "or" => {
103 if let Some(parsed) = parse_or_filter(&value, force_snake_case)
104 && !parsed.is_empty()
105 {
106 or_filters.push(parsed);
107 }
108 }
109 other => {
110 if let Some(filter) = parse_filter(other, &value, force_snake_case) {
111 filters.push(filter);
112 }
113 }
114 }
115 }
116
117 if let Some((start, end)) = range_header.and_then(parse_range_header_value)
118 && end >= start
119 {
120 offset = Some(start);
121 limit = Some(end - start + 1);
122 }
123
124 Ok(PostgrestQuery {
125 columns,
126 filters,
127 or_filters,
128 limit,
129 offset,
130 order,
131 })
132}
133
134pub fn convert_postgrest_filters_to_conditions(
136 filters: &[PostgrestFilter],
137 auto_cast_uuid_filter_values_to_text: bool,
138 column_types: Option<&HashMap<String, String>>,
139) -> Vec<Condition> {
140 filters
141 .iter()
142 .filter_map(|filter| {
143 convert_postgrest_filter_to_condition(
144 filter,
145 auto_cast_uuid_filter_values_to_text,
146 column_types,
147 )
148 })
149 .collect()
150}
151
152pub fn convert_postgrest_or_filter_groups_to_conditions(
154 or_groups: &[Vec<PostgrestFilter>],
155 auto_cast_uuid_filter_values_to_text: bool,
156 column_types: Option<&HashMap<String, String>>,
157) -> Vec<Vec<Condition>> {
158 or_groups
159 .iter()
160 .map(|group| {
161 convert_postgrest_filters_to_conditions(
162 group,
163 auto_cast_uuid_filter_values_to_text,
164 column_types,
165 )
166 })
167 .filter(|group| !group.is_empty())
168 .collect()
169}
170
171pub fn convert_postgrest_filter_to_condition(
173 filter: &PostgrestFilter,
174 auto_cast_uuid_filter_values_to_text: bool,
175 column_types: Option<&HashMap<String, String>>,
176) -> Option<Condition> {
177 let cast: Option<&'static str> = where_cast_for_column(&filter.column, column_types);
178 Some(
179 Condition::new(
180 filter.column.clone(),
181 map_filter_operator(filter.operator),
182 filter.values.clone(),
183 filter.negated,
184 )
185 .with_uuid_value_text_cast(auto_cast_uuid_filter_values_to_text)
186 .with_pg_cast(cast),
187 )
188}
189
190fn map_filter_operator(op: PostgrestFilterOperator) -> ConditionOperator {
191 match op {
192 PostgrestFilterOperator::Eq => ConditionOperator::Eq,
193 PostgrestFilterOperator::Neq => ConditionOperator::Neq,
194 PostgrestFilterOperator::Gt => ConditionOperator::Gt,
195 PostgrestFilterOperator::Lt => ConditionOperator::Lt,
196 PostgrestFilterOperator::Gte => ConditionOperator::Gte,
197 PostgrestFilterOperator::Lte => ConditionOperator::Lte,
198 PostgrestFilterOperator::Like => ConditionOperator::Like,
199 PostgrestFilterOperator::ILike => ConditionOperator::ILike,
200 PostgrestFilterOperator::Is => ConditionOperator::Is,
201 PostgrestFilterOperator::In => ConditionOperator::In,
202 PostgrestFilterOperator::Contains => ConditionOperator::Contains,
203 PostgrestFilterOperator::Contained => ConditionOperator::Contained,
204 }
205}
206
207fn parse_order(value: &str, force_snake_case: bool) -> Option<OrderSpec> {
208 let trimmed = value.trim();
209 let (raw_column, ascending) = if let Some(column) = trimmed.strip_suffix(".asc") {
210 (column, true)
211 } else if let Some(column) = trimmed.strip_suffix(".desc") {
212 (column, false)
213 } else {
214 (trimmed, true)
215 };
216 let column: String = if force_snake_case {
217 normalize_column_name(raw_column, true)
218 } else {
219 raw_column.to_ascii_lowercase()
220 };
221 if column.is_empty() {
222 return None;
223 }
224 Some(OrderSpec { column, ascending })
225}
226
227fn parse_or_filter(value: &str, force_snake_case: bool) -> Option<Vec<PostgrestFilter>> {
228 let trimmed: &str = value.trim();
229 let inner: &str = trimmed.trim_start_matches('(').trim_end_matches(')').trim();
230
231 if inner.is_empty() {
232 return None;
233 }
234
235 let mut filters: Vec<PostgrestFilter> = Vec::new();
236 for expression in inner.split(',') {
237 let expression: &str = expression.trim();
238 if expression.is_empty() {
239 continue;
240 }
241 if let Some((column, _remainder)) = expression.split_once('.')
242 && let Some(filter) = parse_filter(
243 column,
244 expression
245 .strip_prefix(&format!("{}.", column))
246 .unwrap_or(""),
247 force_snake_case,
248 )
249 {
250 filters.push(filter);
251 }
252 }
253
254 if filters.is_empty() {
255 None
256 } else {
257 Some(filters)
258 }
259}
260
261fn parse_filter(column: &str, expression: &str, force_snake_case: bool) -> Option<PostgrestFilter> {
262 let normalized_column: String = if force_snake_case {
263 normalize_column_name(column, true)
264 } else {
265 column.to_string()
266 };
267
268 if normalized_column.is_empty() {
269 return None;
270 }
271
272 let (negated, expr) = if let Some(stripped) = expression.strip_prefix("not.") {
273 (true, stripped)
274 } else {
275 (false, expression)
276 };
277
278 let (operator_str, value_str) = if let Some((op, rest)) = expr.split_once('.') {
279 (op, rest)
280 } else {
281 return None;
282 };
283
284 let operator: PostgrestFilterOperator = match operator_str.to_lowercase().as_str() {
285 "eq" => PostgrestFilterOperator::Eq,
286 "neq" => PostgrestFilterOperator::Neq,
287 "gt" => PostgrestFilterOperator::Gt,
288 "lt" => PostgrestFilterOperator::Lt,
289 "gte" => PostgrestFilterOperator::Gte,
290 "lte" => PostgrestFilterOperator::Lte,
291 "like" => PostgrestFilterOperator::Like,
292 "ilike" => PostgrestFilterOperator::ILike,
293 "is" => PostgrestFilterOperator::Is,
294 "in" => PostgrestFilterOperator::In,
295 "cs" => PostgrestFilterOperator::Contains,
296 "cd" => PostgrestFilterOperator::Contained,
297 other => {
298 if let Some(stripped) = other.strip_prefix("array_") {
299 match stripped {
300 "contains" => PostgrestFilterOperator::Contains,
301 "contained" => PostgrestFilterOperator::Contained,
302 _ => PostgrestFilterOperator::Eq,
303 }
304 } else {
305 PostgrestFilterOperator::Eq
306 }
307 }
308 };
309
310 let values: Vec<Value> = match operator {
311 PostgrestFilterOperator::In => parse_in_values(value_str),
312 PostgrestFilterOperator::Contains | PostgrestFilterOperator::Contained => {
313 vec![parse_array_filter(value_str)]
314 }
315 PostgrestFilterOperator::Is => vec![parse_scalar_value(value_str)],
316 _ => vec![parse_scalar_value(value_str)],
317 };
318
319 Some(PostgrestFilter {
320 column: normalized_column,
321 operator,
322 values,
323 negated,
324 })
325}
326
327fn parse_in_values(value: &str) -> Vec<Value> {
328 let trimmed: &str = value.trim().trim_start_matches('(').trim_end_matches(')');
329 trimmed
330 .split(',')
331 .map(|part| parse_scalar_value(part.trim()))
332 .collect()
333}
334
335fn parse_array_filter(value: &str) -> Value {
336 let trimmed: &str = value.trim().trim_start_matches('.').trim();
337 let inner: &str = trimmed.trim_start_matches('{').trim_end_matches('}').trim();
338 let elements: Vec<Value> = inner
339 .split(',')
340 .map(|part| parse_scalar_value(part.trim()))
341 .collect();
342 Value::Array(elements)
343}
344
345fn parse_scalar_value(value: &str) -> Value {
346 let lowered: String = value.to_lowercase();
347 if lowered.is_empty() {
348 return Value::String(String::new());
349 }
350
351 if lowered == "null" {
352 return Value::Null;
353 }
354
355 if lowered == "true" {
356 return Value::Bool(true);
357 }
358
359 if lowered == "false" {
360 return Value::Bool(false);
361 }
362
363 if let Ok(int_value) = value.parse::<i64>() {
364 return Value::Number(int_value.into());
365 }
366
367 if let Ok(float_value) = value.parse::<f64>()
368 && let Some(number) = serde_json::Number::from_f64(float_value)
369 {
370 return Value::Number(number);
371 }
372
373 Value::String(value.replace('*', "%"))
374}
375
376fn parse_range_header_value(header_value: &str) -> Option<(i64, i64)> {
377 let cleaned: &str = header_value.trim().trim_start_matches("items=");
378 let mut parts = cleaned.split('-');
379 let start: i64 = parts.next()?.trim().parse::<i64>().ok()?;
380 let end: i64 = parts.next()?.trim().parse::<i64>().ok()?;
381 Some((start, end))
382}
383
384#[cfg(test)]
385mod tests {
386 use super::*;
387
388 #[test]
389 fn parse_postgrest_query_parses_select_filters_order_and_range() {
390 let parsed = parse_postgrest_query(
391 "select=id,name&active=eq.true&score=gte.42&order=created_at.desc",
392 Some("items=10-24"),
393 false,
394 )
395 .expect("query");
396
397 assert_eq!(parsed.columns, vec!["id", "name"]);
398 assert_eq!(parsed.limit, Some(15));
399 assert_eq!(parsed.offset, Some(10));
400 assert_eq!(
401 parsed.order,
402 Some(OrderSpec {
403 column: "created_at".to_string(),
404 ascending: false,
405 })
406 );
407 assert_eq!(parsed.filters.len(), 2);
408 assert_eq!(parsed.filters[0].column, "active");
409 assert_eq!(parsed.filters[1].values[0], Value::Number(42.into()));
410 }
411
412 #[test]
413 fn parse_postgrest_query_normalizes_columns_when_forced() {
414 let parsed = parse_postgrest_query("organizationId=eq.123&order=createdAt.asc", None, true)
415 .expect("query");
416
417 assert_eq!(parsed.filters[0].column, "organization_id");
418 assert_eq!(
419 parsed.order,
420 Some(OrderSpec {
421 column: "created_at".to_string(),
422 ascending: true,
423 })
424 );
425 }
426
427 #[test]
428 fn parse_postgrest_query_supports_or_and_array_filters() {
429 let parsed = parse_postgrest_query(
430 "or=(status.eq.active,status.eq.pending)&tags=cs.{alpha,beta}",
431 None,
432 false,
433 )
434 .expect("query");
435
436 assert_eq!(parsed.or_filters.len(), 1);
437 assert_eq!(parsed.or_filters[0].len(), 2);
438 assert_eq!(
439 parsed.filters[0].operator,
440 PostgrestFilterOperator::Contains
441 );
442 assert_eq!(
443 parsed.filters[0].values,
444 vec![Value::Array(vec![
445 Value::String("alpha".to_string()),
446 Value::String("beta".to_string()),
447 ])]
448 );
449 }
450
451 #[test]
452 fn convert_postgrest_filters_to_conditions_preserves_operator_and_pg_cast() {
453 let filters = vec![PostgrestFilter {
454 column: "cache_hit_ratio".to_string(),
455 operator: PostgrestFilterOperator::Gte,
456 values: vec![Value::String("0.75".to_string())],
457 negated: false,
458 }];
459 let column_types = HashMap::from([(
460 "cache_hit_ratio".to_string(),
461 "double precision|float8".to_string(),
462 )]);
463
464 let conditions =
465 convert_postgrest_filters_to_conditions(&filters, true, Some(&column_types));
466
467 assert_eq!(conditions.len(), 1);
468 assert_eq!(conditions[0].operator, ConditionOperator::Gte);
469 assert_eq!(conditions[0].pg_cast.as_deref(), Some("float8"));
470 assert!(conditions[0].auto_cast_uuid_value_to_text);
471 }
472}