athena-gateway 3.18.0

Portable gateway request contracts and normalization primitives for Athena
Documentation
//! Request-domain preprocessing for `/gateway/fetch` and `/gateway/data`.
//!
//! This module turns an optional fetch JSON body into a portable execution
//! plan shared by the runtime route adapters. It owns the legacy-versus-
//! structured shape split, body-level validation, normalized table selection,
//! pagination defaults, and request-domain parsing for conditions and sorting.

use serde_json::Value;

use crate::{
    GatewayFetchConditionError, GatewayFetchRequest, GatewayRequestCondition, PostProcessingConfig,
    SortOptions, StructuredGatewayFetchPlan, build_structured_fetch_plan,
    parse_gateway_fetch_conditions, parse_sort_options_from_body,
};

/// Parsed fetch-body plan used by the runtime adapter to authorize and execute a read.
#[derive(Debug, Clone)]
pub struct GatewayFetchBodyPlan {
    /// Parsed structured-fetch plan when the request uses the structured select contract.
    pub structured_fetch_plan: Option<StructuredGatewayFetchPlan>,
    /// Normalized table selector used for rights, SQL compilation, and execution.
    pub table_name: String,
    /// Parsed equality conditions for legacy fetch bodies.
    pub conditions: Vec<GatewayRequestCondition>,
    /// Resolved page number, defaulting to `1`.
    pub current_page: i64,
    /// Resolved page size, defaulting to `100`.
    pub page_size: i64,
    /// Resolved offset, defaulting to `0`.
    pub offset: i64,
    /// Effective limit derived from the structured plan, legacy body, or page size default.
    pub limit: i64,
    /// Selected columns, with the structured sentinel when the request uses structured fetch.
    pub columns: Vec<String>,
    /// Optional post-processing configuration derived from the body.
    pub post_processing_config: PostProcessingConfig,
    /// Sort options for legacy fetch bodies.
    pub sort_options: Option<SortOptions>,
    /// Cloned request body for webhook/deferred payload passthrough.
    pub request_payload_for_webhook: Option<Value>,
}

/// Body-preprocessing validation errors for gateway fetch request parsing.
#[derive(Debug, Clone)]
pub enum GatewayFetchBodyPlanError {
    InvalidStructuredSelect(String),
    InvalidSchemaTableSelector(String),
    InvalidCondition(GatewayFetchConditionError),
}

/// Builds the portable request-domain plan for a fetch body.
pub fn build_gateway_fetch_body_plan(
    body: Option<&Value>,
    force_camel_case_to_snake_case: bool,
) -> Result<GatewayFetchBodyPlan, GatewayFetchBodyPlanError> {
    let structured_fetch_plan = match body {
        Some(json_body) => build_structured_fetch_plan(json_body, force_camel_case_to_snake_case)
            .map_err(GatewayFetchBodyPlanError::InvalidStructuredSelect)?,
        None => None,
    };
    let parsed_fetch_body = if structured_fetch_plan.is_some() {
        None
    } else {
        body.map(|json_body| {
            GatewayFetchRequest::from_body(json_body, force_camel_case_to_snake_case)
        })
    };

    let table_name = if let Some(plan) = structured_fetch_plan.as_ref() {
        plan.table_name.clone()
    } else {
        match parsed_fetch_body.as_ref() {
            Some(request) => request
                .qualified_table_name()
                .map_err(GatewayFetchBodyPlanError::InvalidSchemaTableSelector)?,
            None => String::new(),
        }
    };

    let conditions = match (structured_fetch_plan.as_ref(), body) {
        (Some(_), _) | (_, None) => Vec::new(),
        (None, Some(json_body)) => {
            parse_gateway_fetch_conditions(json_body, force_camel_case_to_snake_case)
                .map_err(GatewayFetchBodyPlanError::InvalidCondition)?
        }
    };

    let current_page = parsed_fetch_body
        .as_ref()
        .and_then(|request| request.current_page)
        .unwrap_or(1);
    let page_size = parsed_fetch_body
        .as_ref()
        .and_then(|request| request.page_size)
        .unwrap_or(100);
    let offset = parsed_fetch_body
        .as_ref()
        .and_then(|request| request.offset)
        .unwrap_or(0);
    let limit = structured_fetch_plan
        .as_ref()
        .and_then(|plan| plan.limit())
        .or_else(|| parsed_fetch_body.as_ref().and_then(|request| request.limit))
        .unwrap_or(page_size);

    let mut columns = if structured_fetch_plan.is_some() {
        vec!["__structured_select__".to_string()]
    } else {
        parsed_fetch_body
            .as_ref()
            .map(|request| request.columns.clone())
            .unwrap_or_else(|| vec!["*".to_string()])
    };
    if columns.is_empty() {
        columns.push("*".to_string());
    }

    let post_processing_config =
        PostProcessingConfig::from_body(body, force_camel_case_to_snake_case);
    let sort_options = if structured_fetch_plan.is_some() {
        None
    } else {
        parse_sort_options_from_body(body, force_camel_case_to_snake_case)
    };

    Ok(GatewayFetchBodyPlan {
        structured_fetch_plan,
        table_name,
        conditions,
        current_page,
        page_size,
        offset,
        limit,
        columns,
        post_processing_config,
        sort_options,
        request_payload_for_webhook: body.cloned(),
    })
}

#[cfg(test)]
mod tests {
    use super::{GatewayFetchBodyPlanError, build_gateway_fetch_body_plan};
    use serde_json::json;

    #[test]
    fn fetch_body_plan_defaults_without_body() {
        let plan = build_gateway_fetch_body_plan(None, false).expect("plan should parse");

        assert!(plan.structured_fetch_plan.is_none());
        assert_eq!(plan.table_name, "");
        assert_eq!(plan.current_page, 1);
        assert_eq!(plan.page_size, 100);
        assert_eq!(plan.offset, 0);
        assert_eq!(plan.limit, 100);
        assert_eq!(plan.columns, vec!["*".to_string()]);
        assert!(plan.conditions.is_empty());
        assert!(plan.sort_options.is_none());
        assert!(plan.request_payload_for_webhook.is_none());
    }

    #[test]
    fn fetch_body_plan_parses_legacy_body() {
        let body = json!({
            "table_name": "users",
            "schema": "public",
            "columns": ["id", "displayName"],
            "conditions": [
                { "eq_column": "roomId", "eq_value": "42" }
            ],
            "current_page": 3,
            "page_size": 25,
            "offset": 4,
            "limit": 10,
            "sort_by": {
                "field": "displayName",
                "direction": "asc"
            }
        });

        let plan = build_gateway_fetch_body_plan(Some(&body), true).expect("plan should parse");

        assert!(plan.structured_fetch_plan.is_none());
        assert_eq!(plan.table_name, "public.users");
        assert_eq!(plan.current_page, 3);
        assert_eq!(plan.page_size, 25);
        assert_eq!(plan.offset, 4);
        assert_eq!(plan.limit, 10);
        assert_eq!(
            plan.columns,
            vec!["id".to_string(), "display_name".to_string()]
        );
        assert_eq!(plan.conditions.len(), 1);
        assert_eq!(plan.conditions[0].eq_column, "roomId");
        assert_eq!(plan.conditions[0].eq_value, json!(42));
        assert_eq!(
            plan.sort_options.as_ref().map(|sort| sort.column.as_str()),
            Some("display_name")
        );
        assert!(plan.request_payload_for_webhook.is_some());
    }

    #[test]
    fn fetch_body_plan_parses_structured_body() {
        let body = json!({
            "table_name": "public.chat_subscriptions",
            "select": "user_id,users:athena.users!inner(id,username)",
            "limit": 5
        });

        let plan = build_gateway_fetch_body_plan(Some(&body), false).expect("plan should parse");

        assert!(plan.structured_fetch_plan.is_some());
        assert_eq!(plan.table_name, "public.chat_subscriptions");
        assert_eq!(plan.limit, 5);
        assert_eq!(plan.columns, vec!["__structured_select__".to_string()]);
        assert!(plan.conditions.is_empty());
        assert!(plan.sort_options.is_none());
    }

    #[test]
    fn fetch_body_plan_surfaces_condition_errors() {
        let body = json!({
            "table_name": "users",
            "conditions": [
                { "eq_column": "roomId" }
            ]
        });

        let err = build_gateway_fetch_body_plan(Some(&body), true).expect_err("plan should fail");

        match err {
            GatewayFetchBodyPlanError::InvalidCondition(condition_err) => {
                assert!(condition_err.message().contains("room_id is required"));
            }
            other => panic!("expected invalid condition error, got {other:?}"),
        }
    }
}