use serde_json::{Map, Value};
use crate::{GatewayRpcFilter, GatewayRpcFilterOperator, GatewayRpcOrder, GatewayRpcRequest};
pub fn rpc_request_from_post_compat(
function_name: String,
body: Value,
) -> Result<GatewayRpcRequest, String> {
let Value::Object(object) = body else {
return Err("POST /rpc/{function_name} expects a JSON object body".to_string());
};
let has_wrapper_fields = object.contains_key("args")
|| object.contains_key("filters")
|| object.contains_key("select")
|| object.contains_key("count")
|| object.contains_key("limit")
|| object.contains_key("offset")
|| object.contains_key("order")
|| object.contains_key("schema")
|| object.contains_key("function");
if has_wrapper_fields {
let function = object
.get("function")
.and_then(Value::as_str)
.map(str::to_string)
.unwrap_or(function_name);
let schema = object
.get("schema")
.and_then(Value::as_str)
.map(str::to_string)
.unwrap_or_else(|| "public".to_string());
let args = object
.get("args")
.cloned()
.unwrap_or_else(|| Value::Object(Map::new()));
let select = object
.get("select")
.and_then(Value::as_str)
.map(str::to_string);
let filters = object
.get("filters")
.cloned()
.map(serde_json::from_value::<Vec<GatewayRpcFilter>>)
.transpose()
.map_err(|error| format!("invalid filters payload: {}", error))?
.unwrap_or_default();
let count = object
.get("count")
.and_then(Value::as_str)
.map(str::to_string);
let limit = object.get("limit").and_then(Value::as_i64);
let offset = object.get("offset").and_then(Value::as_i64);
let order = object
.get("order")
.cloned()
.map(serde_json::from_value::<GatewayRpcOrder>)
.transpose()
.map_err(|error| format!("invalid order payload: {}", error))?;
return Ok(GatewayRpcRequest {
function,
schema,
args,
select,
filters,
count,
limit,
offset,
order,
});
}
Ok(GatewayRpcRequest {
function: function_name,
schema: "public".to_string(),
args: Value::Object(object),
select: None,
filters: Vec::new(),
count: None,
limit: None,
offset: None,
order: None,
})
}
pub fn rpc_request_from_get_compat(
function_name: String,
query_string: &str,
) -> Result<GatewayRpcRequest, String> {
let pairs: Vec<(String, String)> = serde_urlencoded::from_str(query_string)
.map_err(|error| format!("failed to parse URL query parameters: {}", error))?;
let mut args: Map<String, Value> = Map::new();
let mut filters: Vec<GatewayRpcFilter> = Vec::new();
let mut schema: Option<String> = None;
let mut select: Option<String> = None;
let mut count: Option<String> = None;
let mut limit: Option<i64> = None;
let mut offset: Option<i64> = None;
let mut order: Option<GatewayRpcOrder> = None;
for (key, value) in pairs {
match key.as_str() {
"schema" => schema = Some(value),
"select" => select = Some(value),
"count" => count = Some(value),
"limit" => {
limit = Some(
value
.parse::<i64>()
.map_err(|error| format!("invalid limit '{}': {}", value, error))?,
)
}
"offset" => {
offset = Some(
value
.parse::<i64>()
.map_err(|error| format!("invalid offset '{}': {}", value, error))?,
)
}
"order" => {
order = Some(parse_rpc_order(value)?);
}
_ => {
if let Some((operator, parsed_value)) = parse_rpc_filter_expression(&value) {
filters.push(GatewayRpcFilter {
column: key,
operator,
value: parsed_value,
});
} else {
args.insert(key, parse_rpc_argument_value(&value));
}
}
}
}
Ok(GatewayRpcRequest {
function: function_name,
schema: schema.unwrap_or_else(|| "public".to_string()),
args: Value::Object(args),
select,
filters,
count,
limit,
offset,
order,
})
}
pub fn parse_rpc_order(value: String) -> Result<GatewayRpcOrder, String> {
let mut segments = value.split('.');
let column = segments.next().unwrap_or_default().trim().to_string();
if column.is_empty() {
return Err("order must specify a column".to_string());
}
let direction = segments.next().unwrap_or("asc");
let ascending = !direction.eq_ignore_ascii_case("desc");
Ok(GatewayRpcOrder { column, ascending })
}
pub fn parse_rpc_filter_expression(raw: &str) -> Option<(GatewayRpcFilterOperator, Value)> {
let (operator, payload) = raw.split_once('.')?;
let op = match operator.to_ascii_lowercase().as_str() {
"eq" => GatewayRpcFilterOperator::Eq,
"neq" => GatewayRpcFilterOperator::Neq,
"gt" => GatewayRpcFilterOperator::Gt,
"gte" => GatewayRpcFilterOperator::Gte,
"lt" => GatewayRpcFilterOperator::Lt,
"lte" => GatewayRpcFilterOperator::Lte,
"in" => GatewayRpcFilterOperator::In,
"like" => GatewayRpcFilterOperator::Like,
"ilike" => GatewayRpcFilterOperator::ILike,
"is" => GatewayRpcFilterOperator::Is,
_ => return None,
};
let value = if matches!(op, GatewayRpcFilterOperator::In) {
let values = payload
.trim()
.trim_start_matches('(')
.trim_end_matches(')')
.split(',')
.filter(|segment| !segment.trim().is_empty())
.map(|segment| parse_rpc_argument_value(segment.trim()))
.collect::<Vec<Value>>();
Value::Array(values)
} else {
parse_rpc_argument_value(payload.trim())
};
Some((op, value))
}
pub fn parse_rpc_argument_value(raw: &str) -> Value {
let trimmed = raw.trim();
if trimmed.starts_with('{') && trimmed.ends_with('}') {
let inner = &trimmed[1..trimmed.len().saturating_sub(1)];
if inner.trim().is_empty() {
return Value::Array(Vec::new());
}
let values = inner
.split(',')
.map(|segment| parse_rpc_argument_value(segment.trim().trim_matches('"')))
.collect::<Vec<Value>>();
return Value::Array(values);
}
if trimmed.eq_ignore_ascii_case("null") {
return Value::Null;
}
if trimmed.eq_ignore_ascii_case("true") {
return Value::Bool(true);
}
if trimmed.eq_ignore_ascii_case("false") {
return Value::Bool(false);
}
if let Ok(integer) = trimmed.parse::<i64>() {
return serde_json::json!(integer);
}
if let Ok(float) = trimmed.parse::<f64>() {
return serde_json::json!(float);
}
Value::String(trimmed.to_string())
}
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn parse_rpc_filter_expression_supports_eq() {
let parsed = parse_rpc_filter_expression("eq.The Shire").expect("filter");
assert!(matches!(parsed.0, GatewayRpcFilterOperator::Eq));
assert_eq!(parsed.1, json!("The Shire"));
}
#[test]
fn parse_rpc_argument_value_supports_array_literals() {
let value = parse_rpc_argument_value("{1,2,3}");
assert_eq!(value, json!([1, 2, 3]));
}
#[test]
fn parse_rpc_filter_expression_supports_in_operator_tuple_syntax() {
let parsed = parse_rpc_filter_expression("in.(The Shire,Mordor)").expect("filter");
assert!(matches!(parsed.0, GatewayRpcFilterOperator::In));
assert_eq!(parsed.1, json!(["The Shire", "Mordor"]));
}
#[test]
fn rpc_get_compat_parses_array_args_and_filters() {
let request = rpc_request_from_get_compat(
"plus_one".to_string(),
"arr=%7B1,2,3%7D&name=eq.The%20Shire&count=exact&order=name.desc",
)
.expect("valid GET rpc request");
assert_eq!(request.function, "plus_one");
assert_eq!(request.args["arr"], json!([1, 2, 3]));
assert_eq!(request.filters.len(), 1);
assert!(matches!(
request.filters[0].operator,
GatewayRpcFilterOperator::Eq
));
assert_eq!(request.filters[0].column, "name");
assert_eq!(request.filters[0].value, json!("The Shire"));
assert_eq!(request.count.as_deref(), Some("exact"));
let order = request.order.expect("order");
assert_eq!(order.column, "name");
assert!(!order.ascending);
}
#[test]
fn rpc_post_compat_accepts_wrapped_body_shape() {
let request = rpc_request_from_post_compat(
"hello_world".to_string(),
json!({
"schema": "public",
"args": { "name": "Athena" },
"filters": [{ "column": "name", "operator": "eq", "value": "Athena" }],
"select": "name",
"count": "exact",
"limit": 10,
"offset": 5,
"order": { "column": "name", "ascending": true }
}),
)
.expect("valid POST rpc wrapper");
assert_eq!(request.function, "hello_world");
assert_eq!(request.schema, "public");
assert_eq!(request.args["name"], json!("Athena"));
assert_eq!(request.select.as_deref(), Some("name"));
assert_eq!(request.count.as_deref(), Some("exact"));
assert_eq!(request.limit, Some(10));
assert_eq!(request.offset, Some(5));
assert_eq!(request.filters.len(), 1);
assert!(request.order.is_some());
}
}