use crate::utils::format::normalize_column_name;
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct GatewayRequestCondition {
pub eq_column: String,
pub eq_value: Value,
}
impl GatewayRequestCondition {
pub fn new(eq_column: String, eq_value: Value) -> Self {
Self {
eq_column,
eq_value,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GatewayFetchRequest {
pub table_name: String,
#[serde(default)]
pub columns: Vec<String>,
#[serde(default)]
pub conditions: Vec<GatewayRequestCondition>,
#[serde(default)]
pub current_page: Option<i64>,
#[serde(default)]
pub page_size: Option<i64>,
#[serde(default)]
pub limit: Option<i64>,
#[serde(default)]
pub offset: Option<i64>,
}
impl GatewayFetchRequest {
pub fn from_body(body: &Value, force_camel_case_to_snake_case: bool) -> Self {
let table_name = body
.get("table_name")
.and_then(Value::as_str)
.unwrap_or_default()
.to_string();
let mut columns = parse_columns_from_body(body);
if columns.is_empty() {
columns.push("*".to_string());
}
if force_camel_case_to_snake_case {
columns = columns
.into_iter()
.map(|column| normalize_column_name(&column, true))
.collect();
}
let conditions = parse_conditions_from_body(body);
let current_page = body.get("current_page").and_then(Value::as_i64);
let page_size = body.get("page_size").and_then(Value::as_i64);
let limit = body.get("limit").and_then(Value::as_i64);
let offset = body.get("offset").and_then(Value::as_i64);
Self {
table_name,
columns,
conditions,
current_page,
page_size,
limit,
offset,
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GatewayInsertRequest {
pub table_name: String,
pub insert_body: Value,
#[serde(default)]
pub update_body: Option<Value>,
}
impl GatewayInsertRequest {
pub fn table_name_from_body(body: &Value) -> Option<String> {
body.get("table_name")
.and_then(Value::as_str)
.map(str::to_string)
.filter(|name| !name.trim().is_empty())
}
pub fn from_body(body: &Value) -> Option<Self> {
let table_name = Self::table_name_from_body(body)?;
let insert_body = Self::insert_body_from_body(body)?;
let update_body = body.get("update_body").cloned();
Some(Self {
table_name,
insert_body,
update_body,
})
}
pub fn insert_body_from_body(body: &Value) -> Option<Value> {
body.get("insert_body")
.cloned()
.or_else(|| body.get("data").cloned())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GatewayUpdateRequest {
pub table_name: String,
#[serde(default)]
pub conditions: Vec<GatewayRequestCondition>,
pub data: Value,
}
impl GatewayUpdateRequest {
pub fn from_body(body: &Value, force_camel_case_to_snake_case: bool) -> Option<Self> {
let table_name = body
.get("table_name")
.and_then(Value::as_str)
.map(str::to_string)
.filter(|name| !name.trim().is_empty())?;
let set_payload = extract_update_payload(body, force_camel_case_to_snake_case)?;
let conditions = parse_conditions_from_body(body);
Some(Self {
table_name,
conditions,
data: Value::Object(set_payload),
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GatewayDeleteRequest {
pub table_name: String,
pub resource_id: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GatewaySqlRequest {
pub query: String,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum GatewayRpcFilterOperator {
#[serde(rename = "eq")]
Eq,
#[serde(rename = "neq")]
Neq,
#[serde(rename = "gt")]
Gt,
#[serde(rename = "gte")]
Gte,
#[serde(rename = "lt")]
Lt,
#[serde(rename = "lte")]
Lte,
#[serde(rename = "in")]
In,
#[serde(rename = "like")]
Like,
#[serde(rename = "ilike", alias = "i_like")]
ILike,
#[serde(rename = "is")]
Is,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub struct GatewayRpcFilter {
pub column: String,
pub operator: GatewayRpcFilterOperator,
#[serde(default)]
pub value: Value,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct GatewayRpcOrder {
pub column: String,
#[serde(default = "default_rpc_order_ascending")]
pub ascending: bool,
}
fn default_rpc_order_ascending() -> bool {
true
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GatewayRpcRequest {
#[serde(alias = "function_name")]
pub function: String,
#[serde(default = "default_rpc_schema")]
pub schema: String,
#[serde(default = "default_rpc_args")]
pub args: Value,
#[serde(default)]
pub select: Option<String>,
#[serde(default)]
pub filters: Vec<GatewayRpcFilter>,
#[serde(default)]
pub count: Option<String>,
#[serde(default)]
pub limit: Option<i64>,
#[serde(default)]
pub offset: Option<i64>,
#[serde(default)]
pub order: Option<GatewayRpcOrder>,
}
fn default_rpc_schema() -> String {
"public".to_string()
}
fn default_rpc_args() -> Value {
Value::Object(Map::new())
}
pub const GATEWAY_DEFERRED_KIND_QUERY: &str = "gateway_query";
pub const GATEWAY_DEFERRED_KIND_FETCH: &str = "gateway_fetch";
pub const GATEWAY_DEFERRED_KIND_INSERT: &str = "gateway_insert";
pub const GATEWAY_DEFERRED_KIND_UPDATE: &str = "gateway_update";
pub const GATEWAY_DEFERRED_KIND_DELETE: &str = "gateway_delete";
pub const GATEWAY_DEFERRED_KIND_RPC: &str = "gateway_rpc";
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GatewayDeferredRequest {
pub kind: String,
pub request_id: String,
pub client_name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub request_body: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub query: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub requested_at_unix_ms: Option<i64>,
}
impl GatewayDeferredRequest {
pub fn for_query(
request_id: impl Into<String>,
client_name: impl Into<String>,
query: impl Into<String>,
) -> Self {
Self {
kind: GATEWAY_DEFERRED_KIND_QUERY.to_string(),
request_id: request_id.into(),
client_name: client_name.into(),
request_body: None,
query: Some(query.into()),
reason: None,
requested_at_unix_ms: None,
}
}
pub fn for_request_body(
kind: impl Into<String>,
request_id: impl Into<String>,
client_name: impl Into<String>,
request_body: Value,
) -> Self {
Self {
kind: kind.into(),
request_id: request_id.into(),
client_name: client_name.into(),
request_body: Some(request_body),
query: None,
reason: None,
requested_at_unix_ms: None,
}
}
pub fn with_reason(mut self, reason: Option<impl Into<String>>) -> Self {
self.reason = reason.map(|value| value.into());
self
}
pub fn with_requested_at_unix_ms(mut self, requested_at_unix_ms: i64) -> Self {
self.requested_at_unix_ms = Some(requested_at_unix_ms);
self
}
pub fn query_text(&self) -> Option<String> {
if let Some(query) = self.query.as_ref() {
return Some(query.clone());
}
self.request_body
.as_ref()
.and_then(|body| body.get("query"))
.and_then(Value::as_str)
.map(str::to_string)
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GatewaySqlExecutionRequest {
pub query: String,
pub driver: String,
pub db_name: String,
}
#[derive(Debug, Clone)]
pub enum GatewayApiRequestPayload {
Fetch(GatewayFetchRequest),
Insert(GatewayInsertRequest),
Update(GatewayUpdateRequest),
Delete(GatewayDeleteRequest),
Sql(GatewaySqlRequest),
Rpc(GatewayRpcRequest),
}
#[derive(Debug, Clone)]
pub struct GatewayApiRequest {
payload: GatewayApiRequestPayload,
}
impl GatewayApiRequest {
pub fn payload(&self) -> &GatewayApiRequestPayload {
&self.payload
}
pub fn into_payload(self) -> GatewayApiRequestPayload {
self.payload
}
}
impl From<GatewayFetchRequest> for GatewayApiRequest {
fn from(value: GatewayFetchRequest) -> Self {
Self {
payload: GatewayApiRequestPayload::Fetch(value),
}
}
}
impl From<GatewayInsertRequest> for GatewayApiRequest {
fn from(value: GatewayInsertRequest) -> Self {
Self {
payload: GatewayApiRequestPayload::Insert(value),
}
}
}
impl From<GatewayUpdateRequest> for GatewayApiRequest {
fn from(value: GatewayUpdateRequest) -> Self {
Self {
payload: GatewayApiRequestPayload::Update(value),
}
}
}
impl From<GatewayDeleteRequest> for GatewayApiRequest {
fn from(value: GatewayDeleteRequest) -> Self {
Self {
payload: GatewayApiRequestPayload::Delete(value),
}
}
}
impl From<GatewaySqlRequest> for GatewayApiRequest {
fn from(value: GatewaySqlRequest) -> Self {
Self {
payload: GatewayApiRequestPayload::Sql(value),
}
}
}
impl From<GatewayRpcRequest> for GatewayApiRequest {
fn from(value: GatewayRpcRequest) -> Self {
Self {
payload: GatewayApiRequestPayload::Rpc(value),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GatewayRowsMeta {
pub backend: String,
pub statement_count: usize,
pub rows_affected: u64,
pub returned_row_count: usize,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GatewayRowsResponse {
pub data: Vec<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub meta: Option<GatewayRowsMeta>,
}
impl GatewayRowsResponse {
pub fn new(data: Vec<Value>) -> Self {
Self { data, meta: None }
}
pub fn with_meta(mut self, meta: GatewayRowsMeta) -> Self {
self.meta = Some(meta);
self
}
}
pub fn parse_columns_from_body(body: &Value) -> Vec<String> {
let Some(columns_value) = body.get("columns") else {
return Vec::new();
};
if let Some(array) = columns_value.as_array() {
return array
.iter()
.filter_map(|value| value.as_str().map(str::to_string))
.collect();
}
if let Some(csv) = columns_value.as_str() {
return csv
.split(',')
.map(str::trim)
.filter(|token| !token.is_empty())
.map(str::to_string)
.collect();
}
Vec::new()
}
pub fn parse_conditions_from_body(body: &Value) -> Vec<GatewayRequestCondition> {
body.get("conditions")
.and_then(Value::as_array)
.map(|conditions| {
conditions
.iter()
.filter_map(|condition| {
let eq_column = condition
.get("eq_column")
.and_then(Value::as_str)
.map(str::to_string)?;
let eq_value = condition.get("eq_value")?.clone();
Some(GatewayRequestCondition {
eq_column,
eq_value,
})
})
.collect()
})
.unwrap_or_default()
}
pub fn extract_update_payload(
body: &Value,
force_camel_case_to_snake_case: bool,
) -> Option<Map<String, Value>> {
let mut payload = Map::new();
if let Some(columns) = body.get("columns").and_then(Value::as_array) {
for object_value in columns {
if let Some(object) = object_value.as_object() {
for (key, value) in object {
let normalized_key = if force_camel_case_to_snake_case {
normalize_column_name(key, true)
} else {
key.clone()
};
payload.insert(normalized_key, value.clone());
}
}
}
} else if let Some(data) = body.get("data").and_then(Value::as_object) {
for (key, value) in data {
let normalized_key = if force_camel_case_to_snake_case {
normalize_column_name(key, true)
} else {
key.clone()
};
payload.insert(normalized_key, value.clone());
}
} else if let Some(set) = body.get("set").and_then(Value::as_object) {
for (key, value) in set {
let normalized_key = if force_camel_case_to_snake_case {
normalize_column_name(key, true)
} else {
key.clone()
};
payload.insert(normalized_key, value.clone());
}
} else {
return None;
}
if payload.is_empty() {
return None;
}
Some(payload)
}