athena-gateway 3.18.0

Portable gateway request contracts and normalization primitives for Athena
Documentation
//! Portable compatibility parsers for legacy `/rpc/{function_name}` routes.
//!
//! These helpers normalize GET query strings and wrapped POST bodies into the
//! canonical [`GatewayRpcRequest`] DTO without depending on Actix.

use serde_json::{Map, Value};

use crate::{GatewayRpcFilter, GatewayRpcFilterOperator, GatewayRpcOrder, GatewayRpcRequest};

/// Parses the legacy POST `/rpc/{function_name}` body into a canonical RPC request.
pub fn rpc_request_from_post_compat(
    function_name: String,
    body: Value,
) -> Result<GatewayRpcRequest, String> {
    let Value::Object(object) = body else {
        return Err("POST /rpc/{function_name} expects a JSON object body".to_string());
    };

    let has_wrapper_fields = object.contains_key("args")
        || object.contains_key("filters")
        || object.contains_key("select")
        || object.contains_key("count")
        || object.contains_key("limit")
        || object.contains_key("offset")
        || object.contains_key("order")
        || object.contains_key("schema")
        || object.contains_key("function");

    if has_wrapper_fields {
        let function = object
            .get("function")
            .and_then(Value::as_str)
            .map(str::to_string)
            .unwrap_or(function_name);
        let schema = object
            .get("schema")
            .and_then(Value::as_str)
            .map(str::to_string)
            .unwrap_or_else(|| "public".to_string());
        let args = object
            .get("args")
            .cloned()
            .unwrap_or_else(|| Value::Object(Map::new()));
        let select = object
            .get("select")
            .and_then(Value::as_str)
            .map(str::to_string);
        let filters = object
            .get("filters")
            .cloned()
            .map(serde_json::from_value::<Vec<GatewayRpcFilter>>)
            .transpose()
            .map_err(|error| format!("invalid filters payload: {}", error))?
            .unwrap_or_default();
        let count = object
            .get("count")
            .and_then(Value::as_str)
            .map(str::to_string);
        let limit = object.get("limit").and_then(Value::as_i64);
        let offset = object.get("offset").and_then(Value::as_i64);
        let order = object
            .get("order")
            .cloned()
            .map(serde_json::from_value::<GatewayRpcOrder>)
            .transpose()
            .map_err(|error| format!("invalid order payload: {}", error))?;
        return Ok(GatewayRpcRequest {
            function,
            schema,
            args,
            select,
            filters,
            count,
            limit,
            offset,
            order,
        });
    }

    Ok(GatewayRpcRequest {
        function: function_name,
        schema: "public".to_string(),
        args: Value::Object(object),
        select: None,
        filters: Vec::new(),
        count: None,
        limit: None,
        offset: None,
        order: None,
    })
}

/// Parses the legacy GET `/rpc/{function_name}` query string into a canonical RPC request.
pub fn rpc_request_from_get_compat(
    function_name: String,
    query_string: &str,
) -> Result<GatewayRpcRequest, String> {
    let pairs: Vec<(String, String)> = serde_urlencoded::from_str(query_string)
        .map_err(|error| format!("failed to parse URL query parameters: {}", error))?;
    let mut args: Map<String, Value> = Map::new();
    let mut filters: Vec<GatewayRpcFilter> = Vec::new();
    let mut schema: Option<String> = None;
    let mut select: Option<String> = None;
    let mut count: Option<String> = None;
    let mut limit: Option<i64> = None;
    let mut offset: Option<i64> = None;
    let mut order: Option<GatewayRpcOrder> = None;

    for (key, value) in pairs {
        match key.as_str() {
            "schema" => schema = Some(value),
            "select" => select = Some(value),
            "count" => count = Some(value),
            "limit" => {
                limit = Some(
                    value
                        .parse::<i64>()
                        .map_err(|error| format!("invalid limit '{}': {}", value, error))?,
                )
            }
            "offset" => {
                offset = Some(
                    value
                        .parse::<i64>()
                        .map_err(|error| format!("invalid offset '{}': {}", value, error))?,
                )
            }
            "order" => {
                order = Some(parse_rpc_order(value)?);
            }
            _ => {
                if let Some((operator, parsed_value)) = parse_rpc_filter_expression(&value) {
                    filters.push(GatewayRpcFilter {
                        column: key,
                        operator,
                        value: parsed_value,
                    });
                } else {
                    args.insert(key, parse_rpc_argument_value(&value));
                }
            }
        }
    }

    Ok(GatewayRpcRequest {
        function: function_name,
        schema: schema.unwrap_or_else(|| "public".to_string()),
        args: Value::Object(args),
        select,
        filters,
        count,
        limit,
        offset,
        order,
    })
}

/// Parses a compact `column.asc` / `column.desc` order clause.
pub fn parse_rpc_order(value: String) -> Result<GatewayRpcOrder, String> {
    let mut segments = value.split('.');
    let column = segments.next().unwrap_or_default().trim().to_string();
    if column.is_empty() {
        return Err("order must specify a column".to_string());
    }
    let direction = segments.next().unwrap_or("asc");
    let ascending = !direction.eq_ignore_ascii_case("desc");
    Ok(GatewayRpcOrder { column, ascending })
}

/// Parses a compact filter expression like `eq.name` or `in.(a,b)`.
pub fn parse_rpc_filter_expression(raw: &str) -> Option<(GatewayRpcFilterOperator, Value)> {
    let (operator, payload) = raw.split_once('.')?;
    let op = match operator.to_ascii_lowercase().as_str() {
        "eq" => GatewayRpcFilterOperator::Eq,
        "neq" => GatewayRpcFilterOperator::Neq,
        "gt" => GatewayRpcFilterOperator::Gt,
        "gte" => GatewayRpcFilterOperator::Gte,
        "lt" => GatewayRpcFilterOperator::Lt,
        "lte" => GatewayRpcFilterOperator::Lte,
        "in" => GatewayRpcFilterOperator::In,
        "like" => GatewayRpcFilterOperator::Like,
        "ilike" => GatewayRpcFilterOperator::ILike,
        "is" => GatewayRpcFilterOperator::Is,
        _ => return None,
    };

    let value = if matches!(op, GatewayRpcFilterOperator::In) {
        let values = payload
            .trim()
            .trim_start_matches('(')
            .trim_end_matches(')')
            .split(',')
            .filter(|segment| !segment.trim().is_empty())
            .map(|segment| parse_rpc_argument_value(segment.trim()))
            .collect::<Vec<Value>>();
        Value::Array(values)
    } else {
        parse_rpc_argument_value(payload.trim())
    };
    Some((op, value))
}

/// Parses a scalar or array literal from the compatibility RPC grammar.
pub fn parse_rpc_argument_value(raw: &str) -> Value {
    let trimmed = raw.trim();
    if trimmed.starts_with('{') && trimmed.ends_with('}') {
        let inner = &trimmed[1..trimmed.len().saturating_sub(1)];
        if inner.trim().is_empty() {
            return Value::Array(Vec::new());
        }
        let values = inner
            .split(',')
            .map(|segment| parse_rpc_argument_value(segment.trim().trim_matches('"')))
            .collect::<Vec<Value>>();
        return Value::Array(values);
    }

    if trimmed.eq_ignore_ascii_case("null") {
        return Value::Null;
    }
    if trimmed.eq_ignore_ascii_case("true") {
        return Value::Bool(true);
    }
    if trimmed.eq_ignore_ascii_case("false") {
        return Value::Bool(false);
    }
    if let Ok(integer) = trimmed.parse::<i64>() {
        return serde_json::json!(integer);
    }
    if let Ok(float) = trimmed.parse::<f64>() {
        return serde_json::json!(float);
    }
    Value::String(trimmed.to_string())
}

#[cfg(test)]
mod tests {
    use super::*;
    use serde_json::json;

    #[test]
    fn parse_rpc_filter_expression_supports_eq() {
        let parsed = parse_rpc_filter_expression("eq.The Shire").expect("filter");
        assert!(matches!(parsed.0, GatewayRpcFilterOperator::Eq));
        assert_eq!(parsed.1, json!("The Shire"));
    }

    #[test]
    fn parse_rpc_argument_value_supports_array_literals() {
        let value = parse_rpc_argument_value("{1,2,3}");
        assert_eq!(value, json!([1, 2, 3]));
    }

    #[test]
    fn parse_rpc_filter_expression_supports_in_operator_tuple_syntax() {
        let parsed = parse_rpc_filter_expression("in.(The Shire,Mordor)").expect("filter");
        assert!(matches!(parsed.0, GatewayRpcFilterOperator::In));
        assert_eq!(parsed.1, json!(["The Shire", "Mordor"]));
    }

    #[test]
    fn rpc_get_compat_parses_array_args_and_filters() {
        let request = rpc_request_from_get_compat(
            "plus_one".to_string(),
            "arr=%7B1,2,3%7D&name=eq.The%20Shire&count=exact&order=name.desc",
        )
        .expect("valid GET rpc request");

        assert_eq!(request.function, "plus_one");
        assert_eq!(request.args["arr"], json!([1, 2, 3]));
        assert_eq!(request.filters.len(), 1);
        assert!(matches!(
            request.filters[0].operator,
            GatewayRpcFilterOperator::Eq
        ));
        assert_eq!(request.filters[0].column, "name");
        assert_eq!(request.filters[0].value, json!("The Shire"));
        assert_eq!(request.count.as_deref(), Some("exact"));
        let order = request.order.expect("order");
        assert_eq!(order.column, "name");
        assert!(!order.ascending);
    }

    #[test]
    fn rpc_post_compat_accepts_wrapped_body_shape() {
        let request = rpc_request_from_post_compat(
            "hello_world".to_string(),
            json!({
                "schema": "public",
                "args": { "name": "Athena" },
                "filters": [{ "column": "name", "operator": "eq", "value": "Athena" }],
                "select": "name",
                "count": "exact",
                "limit": 10,
                "offset": 5,
                "order": { "column": "name", "ascending": true }
            }),
        )
        .expect("valid POST rpc wrapper");

        assert_eq!(request.function, "hello_world");
        assert_eq!(request.schema, "public");
        assert_eq!(request.args["name"], json!("Athena"));
        assert_eq!(request.select.as_deref(), Some("name"));
        assert_eq!(request.count.as_deref(), Some("exact"));
        assert_eq!(request.limit, Some(10));
        assert_eq!(request.offset, Some(5));
        assert_eq!(request.filters.len(), 1);
        assert!(request.order.is_some());
    }
}