1mod auth_rights;
33mod delete;
34mod fetch;
35mod fetch_body_plan;
36mod fetch_conditions;
37mod fetch_get_compat;
38mod fetch_singleflight;
39mod postgrest;
40mod query_plan;
41mod relation_select_compatibility;
42mod resource_id;
43mod rpc_compat;
44mod sql;
45mod structured_fetch;
46mod structured_fetch_sql;
47mod update_plan;
48
49use serde::{Deserialize, Serialize};
50use serde_json::{Map, Value};
51use std::str::FromStr;
52
53fn sanitize_identifier(identifier: &str) -> Option<String> {
54 let mut chars = identifier.chars();
55 let first = chars.next()?;
56 if !(first.is_ascii_alphabetic() || first == '_') {
57 return None;
58 }
59 if !chars.all(|ch| ch.is_ascii_alphanumeric() || ch == '_') {
60 return None;
61 }
62 Some(format!("\"{identifier}\""))
63}
64
65pub fn camel_to_snake_case(input: &str) -> String {
67 let mut snake = String::with_capacity(input.len() * 2);
68 let mut chars = input.chars().peekable();
69 let mut previous: Option<char> = None;
70
71 while let Some(ch) = chars.next() {
72 if ch.is_ascii_uppercase() {
73 if let Some(prev) = previous {
74 let prev_is_lower_or_digit = prev.is_ascii_lowercase() || prev.is_ascii_digit();
75 let next_is_lower = chars
76 .peek()
77 .map(|next| next.is_ascii_lowercase())
78 .unwrap_or(false);
79
80 if prev_is_lower_or_digit || (prev.is_ascii_uppercase() && next_is_lower) {
81 snake.push('_');
82 }
83 }
84 snake.push(ch.to_ascii_lowercase());
85 } else {
86 snake.push(ch);
87 }
88
89 previous = Some(ch);
90 }
91
92 snake
93}
94
95pub fn normalize_column_name(column: &str, force: bool) -> String {
97 if !force || column == "*" {
98 return column.to_string();
99 }
100
101 camel_to_snake_case(column)
102}
103
104pub use auth_rights::{
105 delete_right_for_resource, missing_required_rights, query_right, read_right_for_resource,
106 right_matches, rpc_right, storage_proxy_right, typesense_proxy_right, write_right_for_resource,
107};
108pub use delete::{
109 GatewayDeleteRequestPlan, GatewayDeleteRequestPlanError, GatewayDeleteResourceIdPlan,
110 build_gateway_delete_request_plan, delete_request_required_right,
111 normalize_delete_resource_id_column_name, normalize_delete_resource_id_lookup_table,
112 plan_delete_resource_id_resolution,
113};
114pub use fetch::{
115 AggregationStrategy, GatewayFetchConditionError, PostProcessingConfig, SortOptions,
116 TimeGranularity, apply_post_processing, build_fetch_hashed_cache_key,
117 build_fetch_hashed_cache_key_legacy8, coerce_room_id_eq_value, parse_gateway_fetch_conditions,
118 parse_room_id_value, parse_sort_options_from_body,
119};
120pub use fetch_body_plan::{
121 GatewayFetchBodyPlan, GatewayFetchBodyPlanError, build_gateway_fetch_body_plan,
122};
123pub use fetch_conditions::{
124 RequestCondition, merge_column_types_with_stats_fallback, resolve_where_column_types,
125 to_query_conditions, to_query_conditions_with_types,
126};
127pub use fetch_get_compat::{
128 GatewayGetFetchCompatibilityError, GatewayGetFetchCompatibilityPlan,
129 build_gateway_get_fetch_compatibility_plan,
130};
131pub use fetch_singleflight::{
132 GatewayFetchRowsResult, GatewayFetchSingleflight, GatewayFetchSingleflightRole,
133 GatewayInFlightFetch,
134};
135pub use postgrest::{
136 OrderSpec, PostgrestFilter, PostgrestFilterOperator, PostgrestQuery,
137 convert_postgrest_filter_to_condition, convert_postgrest_filters_to_conditions,
138 convert_postgrest_or_filter_groups_to_conditions, parse_postgrest_query,
139};
140pub use query_plan::{
141 GatewayQueryCompatibilityPlan, GatewayQueryRequestParseError, GatewayQueryRequestPlan,
142 GatewayQueryRequestPlanError, build_gateway_query_request_plan,
143 parse_gateway_query_request_body,
144};
145pub use relation_select_compatibility::{
146 GatewayRelationSelectRewrite, GatewayRelationSelectTableRef, try_rewrite_relation_select_query,
147};
148pub use resource_id::{
149 find_closest_uuid_column, get_resource_id_key_with_uuid_loader,
150 parse_uuid_columns_from_schema_rows,
151};
152pub use rpc_compat::{
153 parse_rpc_argument_value, parse_rpc_filter_expression, parse_rpc_order,
154 rpc_request_from_get_compat, rpc_request_from_post_compat,
155};
156pub use sql::{
157 GatewayApiRequest, GatewayApiRequestPayload, GatewaySqlExecutionMode,
158 GatewaySqlExecutionRequest, GatewaySqlRequest,
159};
160pub use structured_fetch::{
161 StructuredColumnField, StructuredFilter, StructuredFilterOperator, StructuredGatewayFetchPlan,
162 StructuredJoinKind, StructuredOrderBy, StructuredRelationField, StructuredSelectField,
163 StructuredSelectOperation, StructuredSelectQuery, StructuredSortDirection,
164 build_structured_fetch_cache_key, build_structured_fetch_plan,
165};
166pub use structured_fetch_sql::{execute_structured_fetch_sql, render_structured_fetch_sql};
167pub use update_plan::{
168 GatewayUpdateRequestPlan, GatewayUpdateRequestPlanError, build_gateway_update_request_plan,
169};
170
171#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
173#[serde(rename_all = "snake_case")]
174pub enum GatewayOperationKind {
175 Fetch,
176 Insert,
177 Update,
178 Delete,
179 Query,
180 Rpc,
181}
182
183impl GatewayOperationKind {
184 pub const fn as_str(self) -> &'static str {
186 match self {
187 Self::Fetch => "fetch",
188 Self::Insert => "insert",
189 Self::Update => "update",
190 Self::Delete => "delete",
191 Self::Query => "query",
192 Self::Rpc => "rpc",
193 }
194 }
195
196 pub const fn deferred_kind(self) -> &'static str {
198 match self {
199 Self::Fetch => "gateway_fetch",
200 Self::Insert => "gateway_insert",
201 Self::Update => "gateway_update",
202 Self::Delete => "gateway_delete",
203 Self::Query => "gateway_query",
204 Self::Rpc => "gateway_rpc",
205 }
206 }
207
208 pub fn from_public_route_op(raw: &str) -> Option<Self> {
210 match raw.trim().to_ascii_lowercase().as_str() {
211 "fetch" => Some(Self::Fetch),
212 "insert" => Some(Self::Insert),
213 "update" => Some(Self::Update),
214 "delete" => Some(Self::Delete),
215 "query" => Some(Self::Query),
216 "rpc" => Some(Self::Rpc),
217 _ => None,
218 }
219 }
220
221 pub fn from_deferred_kind(raw: &str) -> Option<Self> {
223 match raw.trim().to_ascii_lowercase().as_str() {
224 "gateway_fetch" => Some(Self::Fetch),
225 "gateway_insert" => Some(Self::Insert),
226 "gateway_update" => Some(Self::Update),
227 "gateway_delete" => Some(Self::Delete),
228 "gateway_query" => Some(Self::Query),
229 "gateway_rpc" => Some(Self::Rpc),
230 _ => None,
231 }
232 }
233}
234
235impl FromStr for GatewayOperationKind {
236 type Err = ();
237
238 fn from_str(value: &str) -> Result<Self, Self::Err> {
239 Self::from_public_route_op(value).ok_or(())
240 }
241}
242
243#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
245pub struct GatewayRequestCondition {
246 pub eq_column: String,
247 pub eq_value: Value,
248}
249
250impl GatewayRequestCondition {
251 pub fn new(eq_column: String, eq_value: Value) -> Self {
253 Self {
254 eq_column,
255 eq_value,
256 }
257 }
258}
259
260#[derive(Debug, Clone, Serialize, Deserialize)]
262pub struct GatewayFetchRequest {
263 pub table_name: String,
264 #[serde(default, alias = "schema")]
265 pub schema_name: Option<String>,
266 #[serde(default)]
267 pub columns: Vec<String>,
268 #[serde(default)]
269 pub conditions: Vec<GatewayRequestCondition>,
270 #[serde(default)]
271 pub current_page: Option<i64>,
272 #[serde(default)]
273 pub page_size: Option<i64>,
274 #[serde(default)]
275 pub limit: Option<i64>,
276 #[serde(default)]
277 pub offset: Option<i64>,
278}
279
280impl GatewayFetchRequest {
281 pub fn from_body(body: &Value, force_camel_case_to_snake_case: bool) -> Self {
284 let table_name = body
285 .get("table_name")
286 .and_then(Value::as_str)
287 .unwrap_or_default()
288 .to_string();
289 let schema_name = schema_name_from_body(body);
290
291 let mut columns = parse_columns_from_body(body);
292 if columns.is_empty() {
293 columns.push("*".to_string());
294 }
295 if force_camel_case_to_snake_case {
296 columns = columns
297 .into_iter()
298 .map(|column| normalize_column_name(&column, true))
299 .collect();
300 }
301
302 let conditions = parse_conditions_from_body(body);
303 let current_page = body.get("current_page").and_then(Value::as_i64);
304 let page_size = body.get("page_size").and_then(Value::as_i64);
305 let limit = body.get("limit").and_then(Value::as_i64);
306 let offset = body.get("offset").and_then(Value::as_i64);
307
308 Self {
309 table_name,
310 schema_name,
311 columns,
312 conditions,
313 current_page,
314 page_size,
315 limit,
316 offset,
317 }
318 }
319
320 pub fn qualified_table_name(&self) -> Result<String, String> {
322 qualify_gateway_table_name(&self.table_name, self.schema_name.as_deref())
323 }
324}
325
326#[derive(Debug, Clone, Serialize, Deserialize)]
328pub struct GatewayInsertRequest {
329 pub table_name: String,
330 #[serde(default, alias = "schema")]
331 pub schema_name: Option<String>,
332 pub insert_body: Value,
333 #[serde(default)]
334 pub update_body: Option<Value>,
335}
336
337impl GatewayInsertRequest {
338 pub fn table_name_from_body(body: &Value) -> Option<String> {
340 body.get("table_name")
341 .and_then(Value::as_str)
342 .map(str::to_string)
343 .filter(|name| !name.trim().is_empty())
344 }
345
346 pub fn from_body(body: &Value) -> Option<Self> {
350 let table_name = Self::table_name_from_body(body)?;
351 let insert_body = Self::insert_body_from_body(body)?;
352 let update_body = body.get("update_body").cloned();
353 let schema_name = schema_name_from_body(body);
354
355 Some(Self {
356 table_name,
357 schema_name,
358 insert_body,
359 update_body,
360 })
361 }
362
363 pub fn insert_body_from_body(body: &Value) -> Option<Value> {
365 body.get("insert_body")
366 .cloned()
367 .or_else(|| body.get("data").cloned())
368 }
369
370 pub fn qualified_table_name(&self) -> Result<String, String> {
372 qualify_gateway_table_name(&self.table_name, self.schema_name.as_deref())
373 }
374}
375
376#[derive(Debug, Clone, Serialize, Deserialize)]
378pub struct GatewayUpdateRequest {
379 pub table_name: String,
380 #[serde(default, alias = "schema")]
381 pub schema_name: Option<String>,
382 #[serde(default)]
383 pub conditions: Vec<GatewayRequestCondition>,
384 pub data: Value,
385}
386
387impl GatewayUpdateRequest {
388 pub fn from_body(body: &Value, force_camel_case_to_snake_case: bool) -> Option<Self> {
392 let table_name = body
393 .get("table_name")
394 .and_then(Value::as_str)
395 .map(str::to_string)
396 .filter(|name| !name.trim().is_empty())?;
397 let set_payload = extract_update_payload(body, force_camel_case_to_snake_case)?;
398 let conditions = parse_conditions_from_body(body);
399 let schema_name = schema_name_from_body(body);
400
401 Some(Self {
402 table_name,
403 schema_name,
404 conditions,
405 data: Value::Object(set_payload),
406 })
407 }
408
409 pub fn qualified_table_name(&self) -> Result<String, String> {
411 qualify_gateway_table_name(&self.table_name, self.schema_name.as_deref())
412 }
413}
414
415#[derive(Debug, Clone, Serialize, Deserialize)]
417pub struct GatewayDeleteRequest {
418 pub table_name: String,
419 #[serde(default, alias = "schema")]
420 pub schema_name: Option<String>,
421 pub resource_id: String,
422 #[serde(default, alias = "resource_id_column", alias = "id_column")]
423 pub column_name: Option<String>,
424}
425
426impl GatewayDeleteRequest {
427 pub fn qualified_table_name(&self) -> Result<String, String> {
429 qualify_gateway_table_name(&self.table_name, self.schema_name.as_deref())
430 }
431}
432
433#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
435pub enum GatewayRpcFilterOperator {
436 #[serde(rename = "eq")]
437 Eq,
438 #[serde(rename = "neq")]
439 Neq,
440 #[serde(rename = "gt")]
441 Gt,
442 #[serde(rename = "gte")]
443 Gte,
444 #[serde(rename = "lt")]
445 Lt,
446 #[serde(rename = "lte")]
447 Lte,
448 #[serde(rename = "in")]
449 In,
450 #[serde(rename = "like")]
451 Like,
452 #[serde(rename = "ilike", alias = "i_like")]
453 ILike,
454 #[serde(rename = "is")]
455 Is,
456}
457
458#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
460pub struct GatewayRpcFilter {
461 pub column: String,
462 pub operator: GatewayRpcFilterOperator,
463 #[serde(default)]
464 pub value: Value,
465}
466
467#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
469pub struct GatewayRpcOrder {
470 pub column: String,
471 #[serde(default = "default_rpc_order_ascending")]
472 pub ascending: bool,
473}
474
475fn default_rpc_order_ascending() -> bool {
476 true
477}
478
479#[derive(Debug, Clone, Serialize, Deserialize)]
481pub struct GatewayRpcRequest {
482 #[serde(alias = "function_name")]
483 pub function: String,
484 #[serde(default = "default_rpc_schema")]
485 pub schema: String,
486 #[serde(default = "default_rpc_args")]
487 pub args: Value,
488 #[serde(default)]
489 pub select: Option<String>,
490 #[serde(default)]
491 pub filters: Vec<GatewayRpcFilter>,
492 #[serde(default)]
493 pub count: Option<String>,
494 #[serde(default)]
495 pub limit: Option<i64>,
496 #[serde(default)]
497 pub offset: Option<i64>,
498 #[serde(default)]
499 pub order: Option<GatewayRpcOrder>,
500}
501
502fn default_rpc_schema() -> String {
503 "public".to_string()
504}
505
506fn default_rpc_args() -> Value {
507 Value::Object(Map::new())
508}
509
510pub const GATEWAY_DEFERRED_KIND_QUERY: &str = GatewayOperationKind::Query.deferred_kind();
511pub const GATEWAY_DEFERRED_KIND_FETCH: &str = GatewayOperationKind::Fetch.deferred_kind();
512pub const GATEWAY_DEFERRED_KIND_INSERT: &str = GatewayOperationKind::Insert.deferred_kind();
513pub const GATEWAY_DEFERRED_KIND_UPDATE: &str = GatewayOperationKind::Update.deferred_kind();
514pub const GATEWAY_DEFERRED_KIND_DELETE: &str = GatewayOperationKind::Delete.deferred_kind();
515pub const GATEWAY_DEFERRED_KIND_RPC: &str = GatewayOperationKind::Rpc.deferred_kind();
516pub const GATEWAY_ERROR_CODE_VALIDATION_FAILED: &str = "VALIDATION_FAILED";
517pub const GATEWAY_ERROR_CODE_INTERNAL_ERROR: &str = "INTERNAL_ERROR";
518pub const GATEWAY_ERROR_CODE_SERVICE_UNAVAILABLE: &str = "SERVICE_UNAVAILABLE";
519
520#[derive(Debug, Clone, Serialize, Deserialize)]
522pub struct GatewayDeferredRequest {
523 pub kind: String,
524 pub request_id: String,
525 pub client_name: String,
526 #[serde(skip_serializing_if = "Option::is_none")]
527 pub request_body: Option<Value>,
528 #[serde(skip_serializing_if = "Option::is_none")]
529 pub query: Option<String>,
530 #[serde(skip_serializing_if = "Option::is_none")]
531 pub reason: Option<String>,
532 #[serde(skip_serializing_if = "Option::is_none")]
533 pub requested_at_unix_ms: Option<i64>,
534}
535
536impl GatewayDeferredRequest {
537 pub fn for_query(
539 request_id: impl Into<String>,
540 client_name: impl Into<String>,
541 query: impl Into<String>,
542 ) -> Self {
543 Self {
544 kind: GATEWAY_DEFERRED_KIND_QUERY.to_string(),
545 request_id: request_id.into(),
546 client_name: client_name.into(),
547 request_body: None,
548 query: Some(query.into()),
549 reason: None,
550 requested_at_unix_ms: None,
551 }
552 }
553
554 pub fn for_request_body(
556 kind: impl Into<String>,
557 request_id: impl Into<String>,
558 client_name: impl Into<String>,
559 request_body: Value,
560 ) -> Self {
561 Self {
562 kind: kind.into(),
563 request_id: request_id.into(),
564 client_name: client_name.into(),
565 request_body: Some(request_body),
566 query: None,
567 reason: None,
568 requested_at_unix_ms: None,
569 }
570 }
571
572 pub fn with_reason(mut self, reason: Option<impl Into<String>>) -> Self {
574 self.reason = reason.map(|value| value.into());
575 self
576 }
577
578 pub fn with_requested_at_unix_ms(mut self, requested_at_unix_ms: i64) -> Self {
580 self.requested_at_unix_ms = Some(requested_at_unix_ms);
581 self
582 }
583
584 pub fn query_text(&self) -> Option<String> {
586 if let Some(query) = self.query.as_ref() {
587 return Some(query.clone());
588 }
589 self.request_body
590 .as_ref()
591 .and_then(|body| body.get("query"))
592 .and_then(Value::as_str)
593 .map(str::to_string)
594 }
595
596 pub fn schema_name(&self) -> Option<String> {
598 self.request_body.as_ref().and_then(schema_name_from_body)
599 }
600}
601
602#[derive(Debug, Clone, Serialize, Deserialize)]
604pub struct GatewayRowsMeta {
605 pub backend: String,
606 pub statement_count: usize,
607 pub rows_affected: u64,
608 pub returned_row_count: usize,
609}
610
611#[derive(Debug, Clone, Serialize, Deserialize)]
613pub struct GatewayRowsResponse {
614 pub data: Vec<Value>,
615 #[serde(skip_serializing_if = "Option::is_none")]
616 pub meta: Option<GatewayRowsMeta>,
617}
618
619impl GatewayRowsResponse {
620 pub fn new(data: Vec<Value>) -> Self {
622 Self { data, meta: None }
623 }
624
625 pub fn with_meta(mut self, meta: GatewayRowsMeta) -> Self {
627 self.meta = Some(meta);
628 self
629 }
630}
631
632#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
634pub struct GatewayErrorData {
635 pub operation: String,
636 #[serde(skip_serializing_if = "Option::is_none")]
637 pub details: Option<Value>,
638}
639
640#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
642pub struct GatewayErrorEnvelope {
643 pub status: String,
644 pub code: String,
645 pub message: String,
646 pub error: String,
647 pub data: GatewayErrorData,
648}
649
650impl GatewayErrorEnvelope {
651 pub fn new(
653 code: impl Into<String>,
654 operation: GatewayOperationKind,
655 message: impl Into<String>,
656 error: impl Into<String>,
657 details: Option<Value>,
658 ) -> Self {
659 Self {
660 status: "error".to_string(),
661 code: code.into(),
662 message: message.into(),
663 error: error.into(),
664 data: GatewayErrorData {
665 operation: operation.as_str().to_string(),
666 details,
667 },
668 }
669 }
670}
671
672pub fn parse_columns_from_body(body: &Value) -> Vec<String> {
674 let Some(columns_value) = body.get("columns") else {
675 return Vec::new();
676 };
677 if let Some(array) = columns_value.as_array() {
678 return array
679 .iter()
680 .filter_map(|value| value.as_str().map(str::to_string))
681 .collect();
682 }
683 if let Some(csv) = columns_value.as_str() {
684 return csv
685 .split(',')
686 .map(str::trim)
687 .filter(|token| !token.is_empty())
688 .map(str::to_string)
689 .collect();
690 }
691 Vec::new()
692}
693
694pub fn parse_conditions_from_body(body: &Value) -> Vec<GatewayRequestCondition> {
696 body.get("conditions")
697 .and_then(Value::as_array)
698 .map(|conditions| {
699 conditions
700 .iter()
701 .filter_map(|condition| {
702 let eq_column = condition
703 .get("eq_column")
704 .and_then(Value::as_str)
705 .map(str::to_string)?;
706 let eq_value = condition.get("eq_value")?.clone();
707 Some(GatewayRequestCondition {
708 eq_column,
709 eq_value,
710 })
711 })
712 .collect()
713 })
714 .unwrap_or_default()
715}
716
717pub fn extract_update_payload(
719 body: &Value,
720 force_camel_case_to_snake_case: bool,
721) -> Option<Map<String, Value>> {
722 let mut payload = Map::new();
723 if let Some(columns) = body.get("columns").and_then(Value::as_array) {
724 for object_value in columns {
725 if let Some(object) = object_value.as_object() {
726 for (key, value) in object {
727 let normalized_key = if force_camel_case_to_snake_case {
728 normalize_column_name(key, true)
729 } else {
730 key.clone()
731 };
732 payload.insert(normalized_key, value.clone());
733 }
734 }
735 }
736 } else if let Some(data) = body.get("data").and_then(Value::as_object) {
737 for (key, value) in data {
738 let normalized_key = if force_camel_case_to_snake_case {
739 normalize_column_name(key, true)
740 } else {
741 key.clone()
742 };
743 payload.insert(normalized_key, value.clone());
744 }
745 } else if let Some(set) = body.get("set").and_then(Value::as_object) {
746 for (key, value) in set {
747 let normalized_key = if force_camel_case_to_snake_case {
748 normalize_column_name(key, true)
749 } else {
750 key.clone()
751 };
752 payload.insert(normalized_key, value.clone());
753 }
754 } else {
755 return None;
756 }
757
758 if payload.is_empty() {
759 return None;
760 }
761
762 Some(payload)
763}
764
765pub fn schema_name_from_body(body: &Value) -> Option<String> {
767 body.get("schema_name")
768 .or_else(|| body.get("schema"))
769 .and_then(Value::as_str)
770 .map(str::trim)
771 .filter(|value| !value.is_empty())
772 .map(str::to_string)
773}
774
775pub fn normalize_gateway_schema_name(schema_name: Option<&str>) -> Result<Option<String>, String> {
777 let Some(raw_schema_name) = schema_name else {
778 return Ok(None);
779 };
780 let trimmed = raw_schema_name.trim();
781 if trimmed.is_empty() {
782 return Err("schema_name must not be empty".to_string());
783 }
784 let sanitized = sanitize_identifier(trimmed)
785 .ok_or_else(|| "schema_name must be a valid SQL identifier".to_string())?;
786 Ok(Some(sanitized.trim_matches('"').to_string()))
787}
788
789pub fn qualify_gateway_table_name(
791 table_name: &str,
792 schema_name: Option<&str>,
793) -> Result<String, String> {
794 let trimmed_table_name = table_name.trim();
795 if trimmed_table_name.is_empty() {
796 return Err("table_name is required".to_string());
797 }
798
799 let normalized_schema_name = normalize_gateway_schema_name(schema_name)?;
800 match normalized_schema_name {
801 Some(schema_name) => {
802 if trimmed_table_name.contains('.') {
803 return Err(
804 "table_name must not include a schema prefix when schema_name is provided"
805 .to_string(),
806 );
807 }
808
809 sanitize_identifier(trimmed_table_name)
810 .ok_or_else(|| "table_name must be a valid SQL identifier".to_string())?;
811 sanitize_identifier(&schema_name)
812 .ok_or_else(|| "schema_name must be a valid SQL identifier".to_string())?;
813
814 Ok(format!("{schema_name}.{trimmed_table_name}"))
815 }
816 None => Ok(trimmed_table_name.to_string()),
817 }
818}
819
820#[cfg(test)]
821mod tests {
822 use super::*;
823 use serde_json::json;
824
825 #[test]
826 fn normalize_gateway_schema_name_accepts_valid_identifier() {
827 assert_eq!(
828 normalize_gateway_schema_name(Some("analytics")).expect("schema should be valid"),
829 Some("analytics".to_string())
830 );
831 }
832
833 #[test]
834 fn normalize_gateway_schema_name_rejects_invalid_identifier() {
835 let err = normalize_gateway_schema_name(Some("public;drop schema public"))
836 .expect_err("invalid identifier should fail");
837 assert!(err.contains("valid SQL identifier"));
838 }
839
840 #[test]
841 fn qualify_gateway_table_name_with_schema_builds_qualified_target() {
842 let qualified = qualify_gateway_table_name("events", Some("analytics"))
843 .expect("qualifying table should succeed");
844 assert_eq!(qualified, "analytics.events");
845 }
846
847 #[test]
848 fn qualify_gateway_table_name_rejects_ambiguous_schema_sources() {
849 let err = qualify_gateway_table_name("public.events", Some("analytics"))
850 .expect_err("schema_name with qualified table should fail");
851 assert!(err.contains("must not include a schema prefix"));
852 }
853
854 #[test]
855 fn schema_name_from_body_supports_alias() {
856 assert_eq!(
857 schema_name_from_body(&json!({ "schema": "analytics" })),
858 Some("analytics".to_string())
859 );
860 assert_eq!(
861 schema_name_from_body(&json!({ "schema_name": "reporting" })),
862 Some("reporting".to_string())
863 );
864 }
865
866 #[test]
867 fn fetch_request_from_body_normalizes_columns_when_forced() {
868 let request = GatewayFetchRequest::from_body(
869 &json!({
870 "table_name": "events",
871 "columns": ["userId", "createdAt"],
872 }),
873 true,
874 );
875 assert_eq!(request.columns, vec!["user_id", "created_at"]);
876 }
877
878 #[test]
879 fn update_payload_supports_set_alias() {
880 let payload = extract_update_payload(
881 &json!({
882 "set": {
883 "displayName": "Athena",
884 }
885 }),
886 true,
887 )
888 .expect("payload should parse");
889 assert_eq!(
890 payload.get("display_name"),
891 Some(&Value::String("Athena".to_string()))
892 );
893 }
894
895 #[test]
896 fn gateway_error_envelope_serializes_expected_shape() {
897 let envelope = GatewayErrorEnvelope::new(
898 GATEWAY_ERROR_CODE_VALIDATION_FAILED,
899 GatewayOperationKind::Fetch,
900 "Invalid request",
901 "table_name is required",
902 Some(json!({ "field": "table_name" })),
903 );
904 assert_eq!(envelope.status, "error");
905 assert_eq!(envelope.data.operation, "fetch");
906 assert_eq!(
907 envelope.data.details,
908 Some(json!({ "field": "table_name" }))
909 );
910 }
911}