use serde_json::Value;
use crate::{
GatewayFetchConditionError, GatewayFetchRequest, GatewayRequestCondition, PostProcessingConfig,
SortOptions, StructuredGatewayFetchPlan, build_structured_fetch_plan,
parse_gateway_fetch_conditions, parse_sort_options_from_body,
};
#[derive(Debug, Clone)]
pub struct GatewayFetchBodyPlan {
pub structured_fetch_plan: Option<StructuredGatewayFetchPlan>,
pub table_name: String,
pub conditions: Vec<GatewayRequestCondition>,
pub current_page: i64,
pub page_size: i64,
pub offset: i64,
pub limit: i64,
pub columns: Vec<String>,
pub post_processing_config: PostProcessingConfig,
pub sort_options: Option<SortOptions>,
pub request_payload_for_webhook: Option<Value>,
}
#[derive(Debug, Clone)]
pub enum GatewayFetchBodyPlanError {
InvalidStructuredSelect(String),
InvalidSchemaTableSelector(String),
InvalidCondition(GatewayFetchConditionError),
}
pub fn build_gateway_fetch_body_plan(
body: Option<&Value>,
force_camel_case_to_snake_case: bool,
) -> Result<GatewayFetchBodyPlan, GatewayFetchBodyPlanError> {
let structured_fetch_plan = match body {
Some(json_body) => build_structured_fetch_plan(json_body, force_camel_case_to_snake_case)
.map_err(GatewayFetchBodyPlanError::InvalidStructuredSelect)?,
None => None,
};
let parsed_fetch_body = if structured_fetch_plan.is_some() {
None
} else {
body.map(|json_body| {
GatewayFetchRequest::from_body(json_body, force_camel_case_to_snake_case)
})
};
let table_name = if let Some(plan) = structured_fetch_plan.as_ref() {
plan.table_name.clone()
} else {
match parsed_fetch_body.as_ref() {
Some(request) => request
.qualified_table_name()
.map_err(GatewayFetchBodyPlanError::InvalidSchemaTableSelector)?,
None => String::new(),
}
};
let conditions = match (structured_fetch_plan.as_ref(), body) {
(Some(_), _) | (_, None) => Vec::new(),
(None, Some(json_body)) => {
parse_gateway_fetch_conditions(json_body, force_camel_case_to_snake_case)
.map_err(GatewayFetchBodyPlanError::InvalidCondition)?
}
};
let current_page = parsed_fetch_body
.as_ref()
.and_then(|request| request.current_page)
.unwrap_or(1);
let page_size = parsed_fetch_body
.as_ref()
.and_then(|request| request.page_size)
.unwrap_or(100);
let offset = parsed_fetch_body
.as_ref()
.and_then(|request| request.offset)
.unwrap_or(0);
let limit = structured_fetch_plan
.as_ref()
.and_then(|plan| plan.limit())
.or_else(|| parsed_fetch_body.as_ref().and_then(|request| request.limit))
.unwrap_or(page_size);
let mut columns = if structured_fetch_plan.is_some() {
vec!["__structured_select__".to_string()]
} else {
parsed_fetch_body
.as_ref()
.map(|request| request.columns.clone())
.unwrap_or_else(|| vec!["*".to_string()])
};
if columns.is_empty() {
columns.push("*".to_string());
}
let post_processing_config =
PostProcessingConfig::from_body(body, force_camel_case_to_snake_case);
let sort_options = if structured_fetch_plan.is_some() {
None
} else {
parse_sort_options_from_body(body, force_camel_case_to_snake_case)
};
Ok(GatewayFetchBodyPlan {
structured_fetch_plan,
table_name,
conditions,
current_page,
page_size,
offset,
limit,
columns,
post_processing_config,
sort_options,
request_payload_for_webhook: body.cloned(),
})
}
#[cfg(test)]
mod tests {
use super::{GatewayFetchBodyPlanError, build_gateway_fetch_body_plan};
use serde_json::json;
#[test]
fn fetch_body_plan_defaults_without_body() {
let plan = build_gateway_fetch_body_plan(None, false).expect("plan should parse");
assert!(plan.structured_fetch_plan.is_none());
assert_eq!(plan.table_name, "");
assert_eq!(plan.current_page, 1);
assert_eq!(plan.page_size, 100);
assert_eq!(plan.offset, 0);
assert_eq!(plan.limit, 100);
assert_eq!(plan.columns, vec!["*".to_string()]);
assert!(plan.conditions.is_empty());
assert!(plan.sort_options.is_none());
assert!(plan.request_payload_for_webhook.is_none());
}
#[test]
fn fetch_body_plan_parses_legacy_body() {
let body = json!({
"table_name": "users",
"schema": "public",
"columns": ["id", "displayName"],
"conditions": [
{ "eq_column": "roomId", "eq_value": "42" }
],
"current_page": 3,
"page_size": 25,
"offset": 4,
"limit": 10,
"sort_by": {
"field": "displayName",
"direction": "asc"
}
});
let plan = build_gateway_fetch_body_plan(Some(&body), true).expect("plan should parse");
assert!(plan.structured_fetch_plan.is_none());
assert_eq!(plan.table_name, "public.users");
assert_eq!(plan.current_page, 3);
assert_eq!(plan.page_size, 25);
assert_eq!(plan.offset, 4);
assert_eq!(plan.limit, 10);
assert_eq!(
plan.columns,
vec!["id".to_string(), "display_name".to_string()]
);
assert_eq!(plan.conditions.len(), 1);
assert_eq!(plan.conditions[0].eq_column, "roomId");
assert_eq!(plan.conditions[0].eq_value, json!(42));
assert_eq!(
plan.sort_options.as_ref().map(|sort| sort.column.as_str()),
Some("display_name")
);
assert!(plan.request_payload_for_webhook.is_some());
}
#[test]
fn fetch_body_plan_parses_structured_body() {
let body = json!({
"table_name": "public.chat_subscriptions",
"select": "user_id,users:athena.users!inner(id,username)",
"limit": 5
});
let plan = build_gateway_fetch_body_plan(Some(&body), false).expect("plan should parse");
assert!(plan.structured_fetch_plan.is_some());
assert_eq!(plan.table_name, "public.chat_subscriptions");
assert_eq!(plan.limit, 5);
assert_eq!(plan.columns, vec!["__structured_select__".to_string()]);
assert!(plan.conditions.is_empty());
assert!(plan.sort_options.is_none());
}
#[test]
fn fetch_body_plan_surfaces_condition_errors() {
let body = json!({
"table_name": "users",
"conditions": [
{ "eq_column": "roomId" }
]
});
let err = build_gateway_fetch_body_plan(Some(&body), true).expect_err("plan should fail");
match err {
GatewayFetchBodyPlanError::InvalidCondition(condition_err) => {
assert!(condition_err.message().contains("room_id is required"));
}
other => panic!("expected invalid condition error, got {other:?}"),
}
}
}