Skip to main content

athena_gateway/
lib.rs

1//! Portable gateway-domain contracts for Athena.
2//!
3//! This crate intentionally stays free of Actix and runtime registry concerns.
4//! It owns the reusable request/response DTOs and normalization helpers that
5//! power `/gateway/*` routes across adapters, SDKs, deferred queue payloads,
6//! and future server splits.
7//!
8//! The current scope covers:
9//!
10//! - gateway operation kinds,
11//! - fetch/insert/update/delete/rpc request bodies,
12//! - delete-side resource-ID lookup planning and explicit column normalization,
13//! - deferred gateway queue payloads,
14//! - gateway authorization right naming and wildcard matching helpers,
15//! - fetch request parsing and post-processing helpers,
16//! - fetch body preprocessing for legacy and structured gateway fetch requests,
17//! - legacy `GET /data` compatibility planning and cache-key derivation,
18//! - fetch singleflight coordination for cache-equivalent gateway reads,
19//! - metadata-aware fetch condition builders and stats-rollup type fallbacks,
20//! - PostgREST compatibility parsing and filter conversion helpers,
21//! - RPC compatibility parsing for legacy `/rpc/{function_name}` routes,
22//! - gateway resource-ID fallback resolution heuristics for insert/delete/upsert paths,
23//! - row-response metadata,
24//! - schema/table normalization helpers for gateway requests,
25//! - `/gateway/query` request parsing, normalization, and bounded compatibility planning,
26//! - `/gateway/update` request planning, condition coercion, and derived write-right helpers,
27//! - `/gateway/delete` request planning, resource-ID lookup planning, and derived delete-right helpers,
28//! - relation-select compatibility rewrite helpers for bounded
29//!   `/gateway/query` SQL-to-structured-fetch translation,
30//! - structured-fetch SQL compilation, metadata loading, and execution helpers.
31
32mod auth_rights;
33mod delete;
34mod fetch;
35mod fetch_body_plan;
36mod fetch_conditions;
37mod fetch_get_compat;
38mod fetch_singleflight;
39mod postgrest;
40mod query_plan;
41mod relation_select_compatibility;
42mod resource_id;
43mod rpc_compat;
44mod sql;
45mod structured_fetch;
46mod structured_fetch_sql;
47mod update_plan;
48
49use serde::{Deserialize, Serialize};
50use serde_json::{Map, Value};
51use std::str::FromStr;
52
53fn sanitize_identifier(identifier: &str) -> Option<String> {
54    let mut chars = identifier.chars();
55    let first = chars.next()?;
56    if !(first.is_ascii_alphabetic() || first == '_') {
57        return None;
58    }
59    if !chars.all(|ch| ch.is_ascii_alphanumeric() || ch == '_') {
60        return None;
61    }
62    Some(format!("\"{identifier}\""))
63}
64
65/// Converts `camelCase` or `PascalCase` identifiers to `snake_case`.
66pub fn camel_to_snake_case(input: &str) -> String {
67    let mut snake = String::with_capacity(input.len() * 2);
68    let mut chars = input.chars().peekable();
69    let mut previous: Option<char> = None;
70
71    while let Some(ch) = chars.next() {
72        if ch.is_ascii_uppercase() {
73            if let Some(prev) = previous {
74                let prev_is_lower_or_digit = prev.is_ascii_lowercase() || prev.is_ascii_digit();
75                let next_is_lower = chars
76                    .peek()
77                    .map(|next| next.is_ascii_lowercase())
78                    .unwrap_or(false);
79
80                if prev_is_lower_or_digit || (prev.is_ascii_uppercase() && next_is_lower) {
81                    snake.push('_');
82                }
83            }
84            snake.push(ch.to_ascii_lowercase());
85        } else {
86            snake.push(ch);
87        }
88
89        previous = Some(ch);
90    }
91
92    snake
93}
94
95/// Normalizes a gateway column name, optionally forcing `snake_case`.
96pub fn normalize_column_name(column: &str, force: bool) -> String {
97    if !force || column == "*" {
98        return column.to_string();
99    }
100
101    camel_to_snake_case(column)
102}
103
104pub use auth_rights::{
105    delete_right_for_resource, missing_required_rights, query_right, read_right_for_resource,
106    right_matches, rpc_right, storage_proxy_right, typesense_proxy_right, write_right_for_resource,
107};
108pub use delete::{
109    GatewayDeleteRequestPlan, GatewayDeleteRequestPlanError, GatewayDeleteResourceIdPlan,
110    build_gateway_delete_request_plan, delete_request_required_right,
111    normalize_delete_resource_id_column_name, normalize_delete_resource_id_lookup_table,
112    plan_delete_resource_id_resolution,
113};
114pub use fetch::{
115    AggregationStrategy, GatewayFetchConditionError, PostProcessingConfig, SortOptions,
116    TimeGranularity, apply_post_processing, build_fetch_hashed_cache_key,
117    build_fetch_hashed_cache_key_legacy8, coerce_room_id_eq_value, parse_gateway_fetch_conditions,
118    parse_room_id_value, parse_sort_options_from_body,
119};
120pub use fetch_body_plan::{
121    GatewayFetchBodyPlan, GatewayFetchBodyPlanError, build_gateway_fetch_body_plan,
122};
123pub use fetch_conditions::{
124    RequestCondition, merge_column_types_with_stats_fallback, resolve_where_column_types,
125    to_query_conditions, to_query_conditions_with_types,
126};
127pub use fetch_get_compat::{
128    GatewayGetFetchCompatibilityError, GatewayGetFetchCompatibilityPlan,
129    build_gateway_get_fetch_compatibility_plan,
130};
131pub use fetch_singleflight::{
132    GatewayFetchRowsResult, GatewayFetchSingleflight, GatewayFetchSingleflightRole,
133    GatewayInFlightFetch,
134};
135pub use postgrest::{
136    OrderSpec, PostgrestFilter, PostgrestFilterOperator, PostgrestQuery,
137    convert_postgrest_filter_to_condition, convert_postgrest_filters_to_conditions,
138    convert_postgrest_or_filter_groups_to_conditions, parse_postgrest_query,
139};
140pub use query_plan::{
141    GatewayQueryCompatibilityPlan, GatewayQueryRequestParseError, GatewayQueryRequestPlan,
142    GatewayQueryRequestPlanError, build_gateway_query_request_plan,
143    parse_gateway_query_request_body,
144};
145pub use relation_select_compatibility::{
146    GatewayRelationSelectRewrite, GatewayRelationSelectTableRef, try_rewrite_relation_select_query,
147};
148pub use resource_id::{
149    find_closest_uuid_column, get_resource_id_key_with_uuid_loader,
150    parse_uuid_columns_from_schema_rows,
151};
152pub use rpc_compat::{
153    parse_rpc_argument_value, parse_rpc_filter_expression, parse_rpc_order,
154    rpc_request_from_get_compat, rpc_request_from_post_compat,
155};
156pub use sql::{
157    GatewayApiRequest, GatewayApiRequestPayload, GatewaySqlExecutionMode,
158    GatewaySqlExecutionRequest, GatewaySqlRequest,
159};
160pub use structured_fetch::{
161    StructuredColumnField, StructuredFilter, StructuredFilterOperator, StructuredGatewayFetchPlan,
162    StructuredJoinKind, StructuredOrderBy, StructuredRelationField, StructuredSelectField,
163    StructuredSelectOperation, StructuredSelectQuery, StructuredSortDirection,
164    build_structured_fetch_cache_key, build_structured_fetch_plan,
165};
166pub use structured_fetch_sql::{execute_structured_fetch_sql, render_structured_fetch_sql};
167pub use update_plan::{
168    GatewayUpdateRequestPlan, GatewayUpdateRequestPlanError, build_gateway_update_request_plan,
169};
170
171/// Domain-owned operation kinds for gateway handlers.
172#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
173#[serde(rename_all = "snake_case")]
174pub enum GatewayOperationKind {
175    Fetch,
176    Insert,
177    Update,
178    Delete,
179    Query,
180    Rpc,
181}
182
183impl GatewayOperationKind {
184    /// Returns the public route operation key used by `/public/{route_key}/{op}`.
185    pub const fn as_str(self) -> &'static str {
186        match self {
187            Self::Fetch => "fetch",
188            Self::Insert => "insert",
189            Self::Update => "update",
190            Self::Delete => "delete",
191            Self::Query => "query",
192            Self::Rpc => "rpc",
193        }
194    }
195
196    /// Returns the durable deferred queue kind for this operation.
197    pub const fn deferred_kind(self) -> &'static str {
198        match self {
199            Self::Fetch => "gateway_fetch",
200            Self::Insert => "gateway_insert",
201            Self::Update => "gateway_update",
202            Self::Delete => "gateway_delete",
203            Self::Query => "gateway_query",
204            Self::Rpc => "gateway_rpc",
205        }
206    }
207
208    /// Parses a public route operation segment into a gateway operation kind.
209    pub fn from_public_route_op(raw: &str) -> Option<Self> {
210        match raw.trim().to_ascii_lowercase().as_str() {
211            "fetch" => Some(Self::Fetch),
212            "insert" => Some(Self::Insert),
213            "update" => Some(Self::Update),
214            "delete" => Some(Self::Delete),
215            "query" => Some(Self::Query),
216            "rpc" => Some(Self::Rpc),
217            _ => None,
218        }
219    }
220
221    /// Parses a deferred queue operation kind into a gateway operation kind.
222    pub fn from_deferred_kind(raw: &str) -> Option<Self> {
223        match raw.trim().to_ascii_lowercase().as_str() {
224            "gateway_fetch" => Some(Self::Fetch),
225            "gateway_insert" => Some(Self::Insert),
226            "gateway_update" => Some(Self::Update),
227            "gateway_delete" => Some(Self::Delete),
228            "gateway_query" => Some(Self::Query),
229            "gateway_rpc" => Some(Self::Rpc),
230            _ => None,
231        }
232    }
233}
234
235impl FromStr for GatewayOperationKind {
236    type Err = ();
237
238    fn from_str(value: &str) -> Result<Self, Self::Err> {
239        Self::from_public_route_op(value).ok_or(())
240    }
241}
242
243/// Simple equality filter used by gateway CRUD requests.
244#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
245pub struct GatewayRequestCondition {
246    pub eq_column: String,
247    pub eq_value: Value,
248}
249
250impl GatewayRequestCondition {
251    /// Builds a new equality condition.
252    pub fn new(eq_column: String, eq_value: Value) -> Self {
253        Self {
254            eq_column,
255            eq_value,
256        }
257    }
258}
259
260/// Canonical `/gateway/fetch` request shape.
261#[derive(Debug, Clone, Serialize, Deserialize)]
262pub struct GatewayFetchRequest {
263    pub table_name: String,
264    #[serde(default, alias = "schema")]
265    pub schema_name: Option<String>,
266    #[serde(default)]
267    pub columns: Vec<String>,
268    #[serde(default)]
269    pub conditions: Vec<GatewayRequestCondition>,
270    #[serde(default)]
271    pub current_page: Option<i64>,
272    #[serde(default)]
273    pub page_size: Option<i64>,
274    #[serde(default)]
275    pub limit: Option<i64>,
276    #[serde(default)]
277    pub offset: Option<i64>,
278}
279
280impl GatewayFetchRequest {
281    /// Parses a fetch request from a generic JSON body while preserving gateway
282    /// compatibility (`columns` accepts CSV or array).
283    pub fn from_body(body: &Value, force_camel_case_to_snake_case: bool) -> Self {
284        let table_name = body
285            .get("table_name")
286            .and_then(Value::as_str)
287            .unwrap_or_default()
288            .to_string();
289        let schema_name = schema_name_from_body(body);
290
291        let mut columns = parse_columns_from_body(body);
292        if columns.is_empty() {
293            columns.push("*".to_string());
294        }
295        if force_camel_case_to_snake_case {
296            columns = columns
297                .into_iter()
298                .map(|column| normalize_column_name(&column, true))
299                .collect();
300        }
301
302        let conditions = parse_conditions_from_body(body);
303        let current_page = body.get("current_page").and_then(Value::as_i64);
304        let page_size = body.get("page_size").and_then(Value::as_i64);
305        let limit = body.get("limit").and_then(Value::as_i64);
306        let offset = body.get("offset").and_then(Value::as_i64);
307
308        Self {
309            table_name,
310            schema_name,
311            columns,
312            conditions,
313            current_page,
314            page_size,
315            limit,
316            offset,
317        }
318    }
319
320    /// Returns the normalized `schema.table` target for downstream SQL builders.
321    pub fn qualified_table_name(&self) -> Result<String, String> {
322        qualify_gateway_table_name(&self.table_name, self.schema_name.as_deref())
323    }
324}
325
326/// Canonical `/gateway/insert` request shape.
327#[derive(Debug, Clone, Serialize, Deserialize)]
328pub struct GatewayInsertRequest {
329    pub table_name: String,
330    #[serde(default, alias = "schema")]
331    pub schema_name: Option<String>,
332    pub insert_body: Value,
333    #[serde(default)]
334    pub update_body: Option<Value>,
335}
336
337impl GatewayInsertRequest {
338    /// Reads only `table_name` from raw JSON for early auth checks.
339    pub fn table_name_from_body(body: &Value) -> Option<String> {
340        body.get("table_name")
341            .and_then(Value::as_str)
342            .map(str::to_string)
343            .filter(|name| !name.trim().is_empty())
344    }
345
346    /// Parses from raw JSON body.
347    ///
348    /// Supports `insert_body` as canonical field and `data` as compatibility alias.
349    pub fn from_body(body: &Value) -> Option<Self> {
350        let table_name = Self::table_name_from_body(body)?;
351        let insert_body = Self::insert_body_from_body(body)?;
352        let update_body = body.get("update_body").cloned();
353        let schema_name = schema_name_from_body(body);
354
355        Some(Self {
356            table_name,
357            schema_name,
358            insert_body,
359            update_body,
360        })
361    }
362
363    /// Reads canonical insert payload from raw JSON (`insert_body` or `data`).
364    pub fn insert_body_from_body(body: &Value) -> Option<Value> {
365        body.get("insert_body")
366            .cloned()
367            .or_else(|| body.get("data").cloned())
368    }
369
370    /// Returns the normalized `schema.table` target for downstream SQL builders.
371    pub fn qualified_table_name(&self) -> Result<String, String> {
372        qualify_gateway_table_name(&self.table_name, self.schema_name.as_deref())
373    }
374}
375
376/// Canonical `/gateway/update` request shape.
377#[derive(Debug, Clone, Serialize, Deserialize)]
378pub struct GatewayUpdateRequest {
379    pub table_name: String,
380    #[serde(default, alias = "schema")]
381    pub schema_name: Option<String>,
382    #[serde(default)]
383    pub conditions: Vec<GatewayRequestCondition>,
384    pub data: Value,
385}
386
387impl GatewayUpdateRequest {
388    /// Parses from raw JSON body.
389    ///
390    /// Accepts `columns` (array of objects), `data` object, or `set` object.
391    pub fn from_body(body: &Value, force_camel_case_to_snake_case: bool) -> Option<Self> {
392        let table_name = body
393            .get("table_name")
394            .and_then(Value::as_str)
395            .map(str::to_string)
396            .filter(|name| !name.trim().is_empty())?;
397        let set_payload = extract_update_payload(body, force_camel_case_to_snake_case)?;
398        let conditions = parse_conditions_from_body(body);
399        let schema_name = schema_name_from_body(body);
400
401        Some(Self {
402            table_name,
403            schema_name,
404            conditions,
405            data: Value::Object(set_payload),
406        })
407    }
408
409    /// Returns the normalized `schema.table` target for downstream SQL builders.
410    pub fn qualified_table_name(&self) -> Result<String, String> {
411        qualify_gateway_table_name(&self.table_name, self.schema_name.as_deref())
412    }
413}
414
415/// Canonical `/gateway/delete` request shape.
416#[derive(Debug, Clone, Serialize, Deserialize)]
417pub struct GatewayDeleteRequest {
418    pub table_name: String,
419    #[serde(default, alias = "schema")]
420    pub schema_name: Option<String>,
421    pub resource_id: String,
422    #[serde(default, alias = "resource_id_column", alias = "id_column")]
423    pub column_name: Option<String>,
424}
425
426impl GatewayDeleteRequest {
427    /// Returns the normalized `schema.table` target for downstream SQL builders.
428    pub fn qualified_table_name(&self) -> Result<String, String> {
429        qualify_gateway_table_name(&self.table_name, self.schema_name.as_deref())
430    }
431}
432
433/// Supported filter operators for RPC request pushdown.
434#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
435pub enum GatewayRpcFilterOperator {
436    #[serde(rename = "eq")]
437    Eq,
438    #[serde(rename = "neq")]
439    Neq,
440    #[serde(rename = "gt")]
441    Gt,
442    #[serde(rename = "gte")]
443    Gte,
444    #[serde(rename = "lt")]
445    Lt,
446    #[serde(rename = "lte")]
447    Lte,
448    #[serde(rename = "in")]
449    In,
450    #[serde(rename = "like")]
451    Like,
452    #[serde(rename = "ilike", alias = "i_like")]
453    ILike,
454    #[serde(rename = "is")]
455    Is,
456}
457
458/// Canonical RPC filter clause.
459#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
460pub struct GatewayRpcFilter {
461    pub column: String,
462    pub operator: GatewayRpcFilterOperator,
463    #[serde(default)]
464    pub value: Value,
465}
466
467/// Canonical RPC ordering clause.
468#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
469pub struct GatewayRpcOrder {
470    pub column: String,
471    #[serde(default = "default_rpc_order_ascending")]
472    pub ascending: bool,
473}
474
475fn default_rpc_order_ascending() -> bool {
476    true
477}
478
479/// Canonical `/gateway/rpc` request shape.
480#[derive(Debug, Clone, Serialize, Deserialize)]
481pub struct GatewayRpcRequest {
482    #[serde(alias = "function_name")]
483    pub function: String,
484    #[serde(default = "default_rpc_schema")]
485    pub schema: String,
486    #[serde(default = "default_rpc_args")]
487    pub args: Value,
488    #[serde(default)]
489    pub select: Option<String>,
490    #[serde(default)]
491    pub filters: Vec<GatewayRpcFilter>,
492    #[serde(default)]
493    pub count: Option<String>,
494    #[serde(default)]
495    pub limit: Option<i64>,
496    #[serde(default)]
497    pub offset: Option<i64>,
498    #[serde(default)]
499    pub order: Option<GatewayRpcOrder>,
500}
501
502fn default_rpc_schema() -> String {
503    "public".to_string()
504}
505
506fn default_rpc_args() -> Value {
507    Value::Object(Map::new())
508}
509
510pub const GATEWAY_DEFERRED_KIND_QUERY: &str = GatewayOperationKind::Query.deferred_kind();
511pub const GATEWAY_DEFERRED_KIND_FETCH: &str = GatewayOperationKind::Fetch.deferred_kind();
512pub const GATEWAY_DEFERRED_KIND_INSERT: &str = GatewayOperationKind::Insert.deferred_kind();
513pub const GATEWAY_DEFERRED_KIND_UPDATE: &str = GatewayOperationKind::Update.deferred_kind();
514pub const GATEWAY_DEFERRED_KIND_DELETE: &str = GatewayOperationKind::Delete.deferred_kind();
515pub const GATEWAY_DEFERRED_KIND_RPC: &str = GatewayOperationKind::Rpc.deferred_kind();
516pub const GATEWAY_ERROR_CODE_VALIDATION_FAILED: &str = "VALIDATION_FAILED";
517pub const GATEWAY_ERROR_CODE_INTERNAL_ERROR: &str = "INTERNAL_ERROR";
518pub const GATEWAY_ERROR_CODE_SERVICE_UNAVAILABLE: &str = "SERVICE_UNAVAILABLE";
519
520/// Canonical deferred queue payload for gateway operations.
521#[derive(Debug, Clone, Serialize, Deserialize)]
522pub struct GatewayDeferredRequest {
523    pub kind: String,
524    pub request_id: String,
525    pub client_name: String,
526    #[serde(skip_serializing_if = "Option::is_none")]
527    pub request_body: Option<Value>,
528    #[serde(skip_serializing_if = "Option::is_none")]
529    pub query: Option<String>,
530    #[serde(skip_serializing_if = "Option::is_none")]
531    pub reason: Option<String>,
532    #[serde(skip_serializing_if = "Option::is_none")]
533    pub requested_at_unix_ms: Option<i64>,
534}
535
536impl GatewayDeferredRequest {
537    /// Creates a deferred request for raw SQL query execution.
538    pub fn for_query(
539        request_id: impl Into<String>,
540        client_name: impl Into<String>,
541        query: impl Into<String>,
542    ) -> Self {
543        Self {
544            kind: GATEWAY_DEFERRED_KIND_QUERY.to_string(),
545            request_id: request_id.into(),
546            client_name: client_name.into(),
547            request_body: None,
548            query: Some(query.into()),
549            reason: None,
550            requested_at_unix_ms: None,
551        }
552    }
553
554    /// Creates a deferred request for JSON-body gateway operations.
555    pub fn for_request_body(
556        kind: impl Into<String>,
557        request_id: impl Into<String>,
558        client_name: impl Into<String>,
559        request_body: Value,
560    ) -> Self {
561        Self {
562            kind: kind.into(),
563            request_id: request_id.into(),
564            client_name: client_name.into(),
565            request_body: Some(request_body),
566            query: None,
567            reason: None,
568            requested_at_unix_ms: None,
569        }
570    }
571
572    /// Adds a deferral reason to the payload.
573    pub fn with_reason(mut self, reason: Option<impl Into<String>>) -> Self {
574        self.reason = reason.map(|value| value.into());
575        self
576    }
577
578    /// Sets the original enqueue timestamp in unix milliseconds.
579    pub fn with_requested_at_unix_ms(mut self, requested_at_unix_ms: i64) -> Self {
580        self.requested_at_unix_ms = Some(requested_at_unix_ms);
581        self
582    }
583
584    /// Returns the raw query text, including compatibility fallback from the JSON body.
585    pub fn query_text(&self) -> Option<String> {
586        if let Some(query) = self.query.as_ref() {
587            return Some(query.clone());
588        }
589        self.request_body
590            .as_ref()
591            .and_then(|body| body.get("query"))
592            .and_then(Value::as_str)
593            .map(str::to_string)
594    }
595
596    /// Returns the schema name if the deferred JSON body carries one.
597    pub fn schema_name(&self) -> Option<String> {
598        self.request_body.as_ref().and_then(schema_name_from_body)
599    }
600}
601
602/// Shared response metadata for query-like gateway responses.
603#[derive(Debug, Clone, Serialize, Deserialize)]
604pub struct GatewayRowsMeta {
605    pub backend: String,
606    pub statement_count: usize,
607    pub rows_affected: u64,
608    pub returned_row_count: usize,
609}
610
611/// Shared row-based gateway success response.
612#[derive(Debug, Clone, Serialize, Deserialize)]
613pub struct GatewayRowsResponse {
614    pub data: Vec<Value>,
615    #[serde(skip_serializing_if = "Option::is_none")]
616    pub meta: Option<GatewayRowsMeta>,
617}
618
619impl GatewayRowsResponse {
620    /// Creates a row response without metadata.
621    pub fn new(data: Vec<Value>) -> Self {
622        Self { data, meta: None }
623    }
624
625    /// Attaches backend execution metadata.
626    pub fn with_meta(mut self, meta: GatewayRowsMeta) -> Self {
627        self.meta = Some(meta);
628        self
629    }
630}
631
632/// Normalized gateway error response data.
633#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
634pub struct GatewayErrorData {
635    pub operation: String,
636    #[serde(skip_serializing_if = "Option::is_none")]
637    pub details: Option<Value>,
638}
639
640/// Normalized gateway error response envelope.
641#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
642pub struct GatewayErrorEnvelope {
643    pub status: String,
644    pub code: String,
645    pub message: String,
646    pub error: String,
647    pub data: GatewayErrorData,
648}
649
650impl GatewayErrorEnvelope {
651    /// Creates a normalized error envelope for a gateway operation.
652    pub fn new(
653        code: impl Into<String>,
654        operation: GatewayOperationKind,
655        message: impl Into<String>,
656        error: impl Into<String>,
657        details: Option<Value>,
658    ) -> Self {
659        Self {
660            status: "error".to_string(),
661            code: code.into(),
662            message: message.into(),
663            error: error.into(),
664            data: GatewayErrorData {
665                operation: operation.as_str().to_string(),
666                details,
667            },
668        }
669    }
670}
671
672/// Parses `columns` from gateway body, supporting both CSV and JSON arrays.
673pub fn parse_columns_from_body(body: &Value) -> Vec<String> {
674    let Some(columns_value) = body.get("columns") else {
675        return Vec::new();
676    };
677    if let Some(array) = columns_value.as_array() {
678        return array
679            .iter()
680            .filter_map(|value| value.as_str().map(str::to_string))
681            .collect();
682    }
683    if let Some(csv) = columns_value.as_str() {
684        return csv
685            .split(',')
686            .map(str::trim)
687            .filter(|token| !token.is_empty())
688            .map(str::to_string)
689            .collect();
690    }
691    Vec::new()
692}
693
694/// Parses request conditions from JSON body.
695pub fn parse_conditions_from_body(body: &Value) -> Vec<GatewayRequestCondition> {
696    body.get("conditions")
697        .and_then(Value::as_array)
698        .map(|conditions| {
699            conditions
700                .iter()
701                .filter_map(|condition| {
702                    let eq_column = condition
703                        .get("eq_column")
704                        .and_then(Value::as_str)
705                        .map(str::to_string)?;
706                    let eq_value = condition.get("eq_value")?.clone();
707                    Some(GatewayRequestCondition {
708                        eq_column,
709                        eq_value,
710                    })
711                })
712                .collect()
713        })
714        .unwrap_or_default()
715}
716
717/// Extracts normalized update payload from gateway body.
718pub fn extract_update_payload(
719    body: &Value,
720    force_camel_case_to_snake_case: bool,
721) -> Option<Map<String, Value>> {
722    let mut payload = Map::new();
723    if let Some(columns) = body.get("columns").and_then(Value::as_array) {
724        for object_value in columns {
725            if let Some(object) = object_value.as_object() {
726                for (key, value) in object {
727                    let normalized_key = if force_camel_case_to_snake_case {
728                        normalize_column_name(key, true)
729                    } else {
730                        key.clone()
731                    };
732                    payload.insert(normalized_key, value.clone());
733                }
734            }
735        }
736    } else if let Some(data) = body.get("data").and_then(Value::as_object) {
737        for (key, value) in data {
738            let normalized_key = if force_camel_case_to_snake_case {
739                normalize_column_name(key, true)
740            } else {
741                key.clone()
742            };
743            payload.insert(normalized_key, value.clone());
744        }
745    } else if let Some(set) = body.get("set").and_then(Value::as_object) {
746        for (key, value) in set {
747            let normalized_key = if force_camel_case_to_snake_case {
748                normalize_column_name(key, true)
749            } else {
750                key.clone()
751            };
752            payload.insert(normalized_key, value.clone());
753        }
754    } else {
755        return None;
756    }
757
758    if payload.is_empty() {
759        return None;
760    }
761
762    Some(payload)
763}
764
765/// Reads either `schema_name` or the compatibility alias `schema` from a request body.
766pub fn schema_name_from_body(body: &Value) -> Option<String> {
767    body.get("schema_name")
768        .or_else(|| body.get("schema"))
769        .and_then(Value::as_str)
770        .map(str::trim)
771        .filter(|value| !value.is_empty())
772        .map(str::to_string)
773}
774
775/// Validates and normalizes an optional schema name.
776pub fn normalize_gateway_schema_name(schema_name: Option<&str>) -> Result<Option<String>, String> {
777    let Some(raw_schema_name) = schema_name else {
778        return Ok(None);
779    };
780    let trimmed = raw_schema_name.trim();
781    if trimmed.is_empty() {
782        return Err("schema_name must not be empty".to_string());
783    }
784    let sanitized = sanitize_identifier(trimmed)
785        .ok_or_else(|| "schema_name must be a valid SQL identifier".to_string())?;
786    Ok(Some(sanitized.trim_matches('"').to_string()))
787}
788
789/// Builds the normalized `schema.table` target used by downstream SQL builders.
790pub fn qualify_gateway_table_name(
791    table_name: &str,
792    schema_name: Option<&str>,
793) -> Result<String, String> {
794    let trimmed_table_name = table_name.trim();
795    if trimmed_table_name.is_empty() {
796        return Err("table_name is required".to_string());
797    }
798
799    let normalized_schema_name = normalize_gateway_schema_name(schema_name)?;
800    match normalized_schema_name {
801        Some(schema_name) => {
802            if trimmed_table_name.contains('.') {
803                return Err(
804                    "table_name must not include a schema prefix when schema_name is provided"
805                        .to_string(),
806                );
807            }
808
809            sanitize_identifier(trimmed_table_name)
810                .ok_or_else(|| "table_name must be a valid SQL identifier".to_string())?;
811            sanitize_identifier(&schema_name)
812                .ok_or_else(|| "schema_name must be a valid SQL identifier".to_string())?;
813
814            Ok(format!("{schema_name}.{trimmed_table_name}"))
815        }
816        None => Ok(trimmed_table_name.to_string()),
817    }
818}
819
820#[cfg(test)]
821mod tests {
822    use super::*;
823    use serde_json::json;
824
825    #[test]
826    fn normalize_gateway_schema_name_accepts_valid_identifier() {
827        assert_eq!(
828            normalize_gateway_schema_name(Some("analytics")).expect("schema should be valid"),
829            Some("analytics".to_string())
830        );
831    }
832
833    #[test]
834    fn normalize_gateway_schema_name_rejects_invalid_identifier() {
835        let err = normalize_gateway_schema_name(Some("public;drop schema public"))
836            .expect_err("invalid identifier should fail");
837        assert!(err.contains("valid SQL identifier"));
838    }
839
840    #[test]
841    fn qualify_gateway_table_name_with_schema_builds_qualified_target() {
842        let qualified = qualify_gateway_table_name("events", Some("analytics"))
843            .expect("qualifying table should succeed");
844        assert_eq!(qualified, "analytics.events");
845    }
846
847    #[test]
848    fn qualify_gateway_table_name_rejects_ambiguous_schema_sources() {
849        let err = qualify_gateway_table_name("public.events", Some("analytics"))
850            .expect_err("schema_name with qualified table should fail");
851        assert!(err.contains("must not include a schema prefix"));
852    }
853
854    #[test]
855    fn schema_name_from_body_supports_alias() {
856        assert_eq!(
857            schema_name_from_body(&json!({ "schema": "analytics" })),
858            Some("analytics".to_string())
859        );
860        assert_eq!(
861            schema_name_from_body(&json!({ "schema_name": "reporting" })),
862            Some("reporting".to_string())
863        );
864    }
865
866    #[test]
867    fn fetch_request_from_body_normalizes_columns_when_forced() {
868        let request = GatewayFetchRequest::from_body(
869            &json!({
870                "table_name": "events",
871                "columns": ["userId", "createdAt"],
872            }),
873            true,
874        );
875        assert_eq!(request.columns, vec!["user_id", "created_at"]);
876    }
877
878    #[test]
879    fn update_payload_supports_set_alias() {
880        let payload = extract_update_payload(
881            &json!({
882                "set": {
883                    "displayName": "Athena",
884                }
885            }),
886            true,
887        )
888        .expect("payload should parse");
889        assert_eq!(
890            payload.get("display_name"),
891            Some(&Value::String("Athena".to_string()))
892        );
893    }
894
895    #[test]
896    fn gateway_error_envelope_serializes_expected_shape() {
897        let envelope = GatewayErrorEnvelope::new(
898            GATEWAY_ERROR_CODE_VALIDATION_FAILED,
899            GatewayOperationKind::Fetch,
900            "Invalid request",
901            "table_name is required",
902            Some(json!({ "field": "table_name" })),
903        );
904        assert_eq!(envelope.status, "error");
905        assert_eq!(envelope.data.operation, "fetch");
906        assert_eq!(
907            envelope.data.details,
908            Some(json!({ "field": "table_name" }))
909        );
910    }
911}