#[allow(deprecated)]
use actix_web::HttpRequest;
use actix_web::http::StatusCode;
use actix_web::web;
use actix_web::web::Data;
use actix_web::{HttpResponse, Responder, get, post, web::Json};
use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use reqwest::{Client, RequestBuilder, Response};
use rust_decimal::Decimal;
use rust_decimal::prelude::FromPrimitive;
use serde_json::{Number, Value, json};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::str::FromStr;
use std::time::Instant;
use supabase_rs::SupabaseClient;
use tracing::{error, info};
mod conditions;
use crate::AppState;
use crate::api::cache::check::check_cache_control_and_get_response_v2;
use crate::api::cache::hydrate::hydrate_cache_and_return_json;
use crate::api::gateway::auth::{
authorize_gateway_request, read_right_for_resource, write_right_for_resource,
};
use crate::api::gateway::fetch::conditions::{RequestCondition, to_query_conditions};
use crate::api::headers::x_athena_client::x_athena_client;
use crate::api::headers::x_strip_nulls::get_x_strip_nulls;
use crate::api::headers::x_user_id::get_x_user_id;
use crate::data::parse::strip_nulls::strip_nulls_from_key;
use crate::drivers::postgresql::sqlx_driver::{fetch_rows_with_columns, update_rows};
use crate::drivers::supabase::{
PaginationOptions, fetch_multiple_conditions, fetch_multiple_conditions_with_client,
};
use crate::error::sqlx_parser::process_sqlx_error_with_context;
use crate::error::{ErrorCategory, ProcessedError, generate_trace_id};
use crate::parser::query_builder::Condition;
use crate::utils::format::{normalize_column_name, normalize_columns_csv, normalize_rows};
use crate::utils::request_logging::{LoggedRequest, log_operation_event, log_request};
fn missing_client_header_response() -> HttpResponse {
HttpResponse::BadRequest().json(json!({
"status": "error",
"code": "missing_client_header",
"message": "X-Athena-Client header is required and cannot be empty",
}))
}
#[derive(Debug)]
#[doc(hidden)]
pub enum TimeGranularity {
Day,
Hour,
Minute,
}
impl FromStr for TimeGranularity {
type Err = String;
fn from_str(value: &str) -> Result<Self, Self::Err> {
match value.to_lowercase().as_str() {
"day" => Ok(TimeGranularity::Day),
"hour" => Ok(TimeGranularity::Hour),
"minute" => Ok(TimeGranularity::Minute),
other => Err(format!("Unsupported time_granularity '{}'", other)),
}
}
}
impl TimeGranularity {
fn format_label(&self, datetime: DateTime<Utc>) -> String {
match self {
TimeGranularity::Day => datetime.format("%Y-%m-%d").to_string(),
TimeGranularity::Hour => datetime.format("%Y-%m-%d %H:00").to_string(),
TimeGranularity::Minute => datetime.format("%Y-%m-%d %H:%M").to_string(),
}
}
}
#[derive(Debug, PartialEq, Eq)]
#[doc(hidden)]
pub enum AggregationStrategy {
CumulativeSum,
}
impl FromStr for AggregationStrategy {
type Err = String;
fn from_str(value: &str) -> Result<Self, Self::Err> {
match value.to_lowercase().as_str() {
"cumulative_sum" => Ok(AggregationStrategy::CumulativeSum),
other => Err(format!("Unsupported aggregation_strategy '{}'", other)),
}
}
}
#[derive(Debug)]
#[doc(hidden)]
pub struct PostProcessingConfig {
pub group_by: Option<String>,
pub time_granularity: Option<TimeGranularity>,
pub aggregation_column: Option<String>,
pub aggregation_strategy: Option<AggregationStrategy>,
pub dedup_aggregation: bool,
}
impl PostProcessingConfig {
#[doc(hidden)]
pub fn from_body(body: Option<&Value>, force_snake: bool) -> Self {
let normalize = |value: &str| normalize_column_name(value, force_snake);
let group_by: Option<String> = body
.and_then(|b| b.get("group_by"))
.and_then(Value::as_str)
.map(normalize);
let aggregation_column: Option<String> = body
.and_then(|b| b.get("aggregation_column"))
.and_then(Value::as_str)
.map(normalize);
let time_granularity: Option<TimeGranularity> = body
.and_then(|b| b.get("time_granularity"))
.and_then(Value::as_str)
.and_then(|s| s.parse::<TimeGranularity>().ok());
let aggregation_strategy: Option<AggregationStrategy> = body
.and_then(|b| b.get("aggregation_strategy"))
.and_then(Value::as_str)
.and_then(|s| s.parse::<AggregationStrategy>().ok());
let dedup_aggregation: bool = body
.and_then(|b| b.get("aggregation_dedup"))
.and_then(Value::as_bool)
.unwrap_or(false);
PostProcessingConfig {
group_by,
time_granularity,
aggregation_column,
aggregation_strategy,
dedup_aggregation,
}
}
}
pub(crate) async fn handle_fetch_data_route(
req: HttpRequest,
body: Option<Json<Value>>,
app_state: Data<AppState>,
) -> HttpResponse {
let operation_start: Instant = Instant::now();
let start_time: Instant = Instant::now();
let client_name: String = x_athena_client(&req.clone());
if client_name.is_empty() {
return missing_client_header_response();
}
let force_camel_case_to_snake_case: bool = app_state.gateway_force_camel_case_to_snake_case;
let auto_cast_uuid_filter_values_to_text =
app_state.gateway_auto_cast_uuid_filter_values_to_text;
let mut table_name: String = String::new();
let mut current_page: i64 = 1;
let mut page_size: i64 = 100;
let mut offset: i64 = 0;
let mut conditions: Vec<RequestCondition> = vec![];
let apikey: Option<String> = req
.headers()
.get("apikey")
.or_else(|| req.headers().get("x-api-key"))
.and_then(|value| value.to_str().ok())
.map(|s| s.to_string())
.filter(|key| key == &std::env::var("SUITSBOOKS_API_ADMIN_KEY").unwrap_or_default());
let user_id: String = get_x_user_id(&req)
.or_else(|| apikey.clone())
.unwrap_or_default();
let limit: i64 = match &body {
Some(json_body) => json_body
.get("limit")
.and_then(Value::as_u64)
.unwrap_or(page_size as u64) as i64,
None => page_size,
};
let mut columns_vec: Vec<String> = vec![];
if let Some(ref json_body) = body
&& let Some(cols_val) = json_body.get("columns")
{
if let Some(arr) = cols_val.as_array() {
columns_vec = arr
.iter()
.filter_map(|v| v.as_str().map(|s| s.to_string()))
.collect();
} else if let Some(s) = cols_val.as_str() {
columns_vec = s
.split(',')
.map(|p| p.trim().to_string())
.filter(|s| !s.is_empty())
.collect();
}
}
if columns_vec.is_empty() {
columns_vec.push("*".to_string());
}
if force_camel_case_to_snake_case {
columns_vec = columns_vec
.into_iter()
.map(|col| normalize_column_name(&col, true))
.collect();
}
let strip_nulls: bool = match get_x_strip_nulls(&req) {
Some(value) => value.to_lowercase() == "true",
None => false,
};
let post_processing_config: PostProcessingConfig =
PostProcessingConfig::from_body(body.as_deref(), force_camel_case_to_snake_case);
if let Some(ref json_body) = body {
if let Some(page) = json_body.get("current_page").and_then(Value::as_u64) {
current_page = page as i64;
}
if let Some(size) = json_body.get("page_size").and_then(Value::as_u64) {
page_size = size as i64;
}
if let Some(off) = json_body.get("offset").and_then(Value::as_u64) {
offset = off as i64;
}
}
fn parse_room_id_value(value: &Value) -> Result<i64, String> {
match value {
Value::Number(num) => num
.as_i64()
.ok_or_else(|| "room_id must be an integer".to_string()),
Value::String(text) => {
let trimmed = text.trim();
if trimmed == "*" {
return Err("room_id wildcard '*' is not allowed".to_string());
}
if trimmed.is_empty() {
return Err("room_id must not be empty".to_string());
}
trimmed
.parse::<i64>()
.map_err(|_| "room_id must be numeric".to_string())
}
_ => Err("room_id must be numeric".to_string()),
}
}
if let Some(ref json_body) = body {
if let Some(name) = json_body.get("table_name").and_then(Value::as_str) {
table_name = name.to_string();
}
if let Some(additional_conditions) = json_body.get("conditions").and_then(|c| c.as_array())
{
for condition in additional_conditions {
if let Some(eq_column) = condition.get("eq_column").and_then(Value::as_str) {
let eq_column_str = eq_column.to_string();
let normalized_for_validation: String =
normalize_column_name(eq_column, force_camel_case_to_snake_case);
let eq_value_raw = match condition.get("eq_value") {
Some(value) => value.clone(),
None => {
if normalized_for_validation == "room_id" || eq_column_str == "roomId" {
return HttpResponse::BadRequest().json(json!({
"error": "room_id is required and must be numeric"
}));
}
continue;
}
};
let eq_value: Value =
if normalized_for_validation == "room_id" || eq_column_str == "roomId" {
match parse_room_id_value(&eq_value_raw) {
Ok(room_id) => Value::Number(Number::from(room_id)),
Err(err_msg) => {
return HttpResponse::BadRequest().json(json!({
"error": err_msg
}));
}
}
} else {
eq_value_raw
};
conditions.push(RequestCondition::new(eq_column_str, eq_value));
}
}
}
} else {
let auth = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![read_right_for_resource(None)],
)
.await;
let _logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
return HttpResponse::BadRequest().json(json!({
"error": "table_name is required"
}));
}
let auth = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![read_right_for_resource(Some(&table_name))],
)
.await;
let logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
if table_name.is_empty() {
return HttpResponse::BadRequest().json(json!({
"error": "table_name is required"
}));
}
conditions.sort_by(|a, b| a.eq_column.cmp(&b.eq_column));
let first_eq_column: &str = conditions.first().map_or("_", |c| &c.eq_column);
let hash_input: Value = json!({
"columns": columns_vec,
"conditions": conditions.iter().map(|c| json!({
"eq_column": c.eq_column,
"eq_value": c.eq_value.clone()
})).collect::<Vec<_>>(),
"limit": limit,
"strip_nulls": strip_nulls,
"client": client_name,
});
let hash_str: String = sha256::digest(serde_json::to_string(&hash_input).unwrap_or_default());
let short_hash: &str = &hash_str[..8];
let hashed_cache_key: String = format!(
"{}:{}:{}:{}:{}:{}",
table_name,
first_eq_column,
columns_vec.join(","),
limit,
strip_nulls,
short_hash
);
let cache_result: Option<HttpResponse> =
check_cache_control_and_get_response_v2(&req, app_state.clone(), &hashed_cache_key).await;
match cache_result {
Some(cached_response) => {
return cached_response;
}
None => {
info!(cache_key = %hashed_cache_key, "cache miss (POST gateway fetch)");
}
}
let conditions_json: Vec<Value> = conditions
.iter()
.map(|c| {
json!({
"eq_column": c.eq_column,
"eq_value": c.eq_value.clone()
})
})
.collect();
let columns_refs: Vec<&str> = columns_vec.iter().map(|s| s.as_str()).collect();
let pg_conditions: Vec<Condition> = to_query_conditions(
&conditions[..],
force_camel_case_to_snake_case,
auto_cast_uuid_filter_values_to_text,
);
let page_offset: i64 = if current_page < 1 { 1 } else { current_page };
let calculated_offset: i64 = (page_offset - 1) * page_size + offset;
let data_result: Result<Vec<Value>, String>;
if let Some(pool) = app_state.pg_registry.get_pool(&client_name) {
let fetch_result: Result<Vec<Value>, anyhow::Error> = fetch_rows_with_columns(
&pool,
&table_name,
&columns_refs,
&pg_conditions,
limit,
calculated_offset,
)
.await;
data_result = match fetch_result {
Ok(rows) => {
info!(
backend = "postgres",
row_count = rows.len(),
"fetch_rows_with_columns finished"
);
Ok(rows)
}
Err(err) => {
if let Some(sqlx_err) = err.downcast_ref::<sqlx::Error>() {
let processed = process_sqlx_error_with_context(sqlx_err, Some(&table_name));
error!(
backend = "postgres",
error_code = %processed.error_code,
trace_id = %processed.trace_id,
"fetch_rows_with_columns failed"
);
Err(processed.to_json().to_string())
} else {
error!(
backend = "postgres",
error = %err,
"fetch_rows_with_columns failed"
);
Err(err.to_string())
}
}
};
} else if client_name == "custom_supabase" {
let supabase_url: Option<String> = req
.headers()
.get("x-supabase-url")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
let supabase_key: Option<String> = req
.headers()
.get("x-supabase-key")
.and_then(|v| v.to_str().ok())
.map(|s| s.to_string());
match (supabase_url, supabase_key) {
(Some(url), Some(key)) => match SupabaseClient::new(url, key) {
Ok(client) => {
data_result = fetch_multiple_conditions_with_client(
client,
table_name.clone(),
columns_refs,
Some(conditions_json),
PaginationOptions {
limit,
current_page,
page_size,
offset,
},
)
.await;
match &data_result {
Ok(rows) => info!(
backend = "custom_supabase",
row_count = rows.len(),
"custom Supabase fetch finished"
),
Err(e) => info!(
backend = "custom_supabase",
error = %e,
"custom Supabase fetch failed"
),
}
}
Err(e) => {
error!(
backend = "custom_supabase",
error = %e,
"failed to create Supabase client with provided headers"
);
data_result = Err(format!("Failed to construct Supabase client: {:?}", e));
}
},
_ => {
error!(
backend = "custom_supabase",
"missing supabase credentials in headers"
);
data_result = Err("Missing x-supabase-url or x-supabase-key headers".to_string());
}
}
} else {
data_result = fetch_multiple_conditions(
table_name.clone(),
columns_refs,
Some(conditions_json),
PaginationOptions {
limit,
current_page,
page_size,
offset,
},
&client_name,
)
.await;
match &data_result {
Ok(rows) => info!(
backend = "supabase",
row_count = rows.len(),
"Supabase fetch_multiple_conditions finished"
),
Err(e) => info!(
backend = "supabase",
error = %e,
"Supabase fetch_multiple_conditions failed"
),
}
}
if let Ok(data) = &data_result {
let normalized_rows: Vec<Value> = normalize_rows(data, force_camel_case_to_snake_case);
hydrate_cache_and_return_json(
app_state.clone(),
hashed_cache_key.clone(),
vec![json!({"data": normalized_rows.clone()})],
)
.await;
} else {
error!("Failed to rehydrate cache due to data fetch error");
}
match data_result {
Ok(data) => {
let normalized_rows: Vec<Value> = normalize_rows(&data, force_camel_case_to_snake_case);
let mut data: Value = json!({ "data": normalized_rows, "cache_key": hashed_cache_key });
if strip_nulls {
let data_stripped_result: Option<Value> =
strip_nulls_from_key(&mut data, "data").await;
if let Some(data_stripped) = data_stripped_result {
data = data_stripped.clone();
} else {
error!("Failed to strip nulls from data");
return HttpResponse::InternalServerError().json(json!({
"error": "Failed to strip nulls from data"
}
));
}
}
let row_snapshot: Vec<Value> = data
.get("data")
.and_then(|v| v.as_array())
.cloned()
.unwrap_or_default();
match apply_post_processing(&row_snapshot, &post_processing_config) {
Ok(Some(post_processing)) => {
data["post_processing"] = post_processing;
}
Ok(None) => {}
Err(err) => {
error!("Post-processing error: {}", err);
return HttpResponse::BadRequest().json(json!({
"error": format!("Post-processing failure: {}", err),
"cache_key": hashed_cache_key,
}));
}
}
let _post_processing_applied: bool = data.get("post_processing").is_some();
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"fetch",
Some(&table_name),
operation_start.elapsed().as_millis(),
StatusCode::OK,
Some(json!({
"cache_key": hashed_cache_key,
"row_count": normalized_rows.len()
})),
);
HttpResponse::Ok().json(data)
}
Err(err) => {
if let Ok(error_json) = serde_json::from_str::<Value>(&err) {
if error_json.get("code").is_some() && error_json.get("trace_id").is_some() {
let status = error_json
.get("status_code")
.and_then(Value::as_u64)
.and_then(|code| StatusCode::from_u16(code as u16).ok())
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR);
error!(
table = %table_name,
client = %client_name,
user_id = %user_id,
cache_key = %hashed_cache_key,
duration_ms = %start_time.elapsed().as_millis(),
error_code = %error_json["code"].as_str().unwrap_or("unknown"),
trace_id = %error_json["trace_id"].as_str().unwrap_or("unknown"),
"fetch POST failed with processed error"
);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"fetch",
Some(&table_name),
operation_start.elapsed().as_millis(),
status,
Some(json!({
"cache_key": hashed_cache_key,
"error_code": error_json["code"],
"message": error_json["message"]
})),
);
HttpResponse::build(status).json(error_json)
} else {
error!(
table = %table_name,
client = %client_name,
user_id = %user_id,
cache_key = %hashed_cache_key,
duration_ms = %start_time.elapsed().as_millis(),
error = %err,
"fetch POST failed"
);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"fetch",
Some(&table_name),
operation_start.elapsed().as_millis(),
StatusCode::INTERNAL_SERVER_ERROR,
Some(json!({
"cache_key": hashed_cache_key,
"error": err
})),
);
HttpResponse::InternalServerError().json(
json!({"status": "error", "message": "Failed to fetch data", "error": err, "cache_key": hashed_cache_key}),
)
}
} else if err.starts_with("Unknown client name:") {
HttpResponse::BadRequest().json(json!({
"status": "error",
"code": "unknown_client",
"message": "X-Athena-Client does not match a configured client.",
"details": {
"client": client_name,
"cache_key": hashed_cache_key
}
}))
} else {
error!(
table = %table_name,
client = %client_name,
user_id = %user_id,
cache_key = %hashed_cache_key,
duration_ms = %start_time.elapsed().as_millis(),
error = %err,
"fetch POST failed"
);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"fetch",
Some(&table_name),
operation_start.elapsed().as_millis(),
StatusCode::INTERNAL_SERVER_ERROR,
Some(json!({
"cache_key": hashed_cache_key,
"error": err
})),
);
HttpResponse::InternalServerError().json(
json!({"status": "error", "message": "Failed to fetch data", "error": err, "cache_key": hashed_cache_key}),
)
}
}
}
}
#[doc(hidden)]
pub fn apply_post_processing(
rows: &[Value],
config: &PostProcessingConfig,
) -> Result<Option<Value>, String> {
let group_by_column: &String = match &config.group_by {
Some(column) => column,
None => return Ok(None),
};
if config.aggregation_strategy.is_some() && config.aggregation_column.is_none() {
return Err("aggregation_strategy requires aggregation_column".to_string());
}
let mut buckets: BTreeMap<String, Vec<Value>> = BTreeMap::new();
for row in rows {
let label: String =
extract_group_label(row, group_by_column, config.time_granularity.as_ref())?;
buckets.entry(label).or_default().push(row.clone());
}
let mut grouped: Vec<Value> = Vec::new();
let mut running_total: Decimal = Decimal::ZERO;
for (label, bucket_rows) in buckets {
let base_sum: Option<Decimal> = if let Some(column) = &config.aggregation_column {
Some(sum_bucket(&bucket_rows, column, config.dedup_aggregation)?)
} else {
None
};
let aggregation_value: Option<Decimal> =
match (config.aggregation_strategy.as_ref(), base_sum) {
(Some(AggregationStrategy::CumulativeSum), Some(sum)) => {
running_total += sum;
Some(running_total)
}
(_, Some(sum)) => Some(sum),
_ => None,
};
let aggregation_payload: Option<Value> =
aggregation_value.map(|value| Value::String(value.to_string()));
grouped.push(json!({
"label": label,
"rows": bucket_rows,
"aggregation": aggregation_payload
}));
}
Ok(Some(json!({ "grouped": grouped })))
}
fn extract_group_label(
row: &Value,
column: &str,
granularity: Option<&TimeGranularity>,
) -> Result<String, String> {
let value: &Value = row
.get(column)
.ok_or_else(|| format!("Missing group_by column '{}'", column))?;
if let Some(gran) = granularity {
let datetime: DateTime<Utc> = parse_datetime_value(value)?;
return Ok(gran.format_label(datetime));
}
if let Some(text) = value.as_str() {
return Ok(text.to_string());
}
Ok(value.to_string())
}
#[allow(deprecated)]
fn parse_datetime_value(value: &Value) -> Result<DateTime<Utc>, String> {
match value {
Value::String(text) => DateTime::parse_from_rfc3339(text)
.map(|dt| dt.with_timezone(&Utc))
.or_else(|_| {
NaiveDateTime::parse_from_str(text, "%Y-%m-%d %H:%M:%S")
.map(|naive| DateTime::<Utc>::from_utc(naive, Utc))
})
.map_err(|_| format!("Failed to parse datetime string: '{}'", text)),
Value::Number(number) => {
let timestamp = if let Some(i) = number.as_i64() {
i as f64
} else if let Some(f) = number.as_f64() {
f
} else {
return Err("Numeric timestamp must be integer or float".to_string());
};
let secs: i64 = timestamp.trunc() as i64;
let nanos: u32 = ((timestamp.fract()) * 1_000_000_000f64) as u32;
Utc.timestamp_opt(secs, nanos)
.single()
.ok_or_else(|| format!("Invalid unix timestamp: {}", timestamp))
}
_ => {
Err("Value must be a string (RFC3339/datetime) or number (unix timestamp)".to_string())
}
}
}
fn sum_bucket(rows: &[Value], column: &str, dedup: bool) -> Result<Decimal, String> {
let mut total: Decimal = Decimal::ZERO;
let mut seen: HashSet<Decimal> = HashSet::new();
for row in rows {
let value: &Value = row
.get(column)
.ok_or_else(|| format!("Missing aggregation column '{}'", column))?;
let decimal: Decimal = value_to_decimal(value)
.ok_or_else(|| format!("Aggregation column '{}' contains non-numeric data", column))?;
if dedup {
if seen.contains(&decimal) {
continue;
}
seen.insert(decimal);
}
total += decimal;
}
Ok(total)
}
fn value_to_decimal(value: &Value) -> Option<Decimal> {
match value {
Value::Number(num) => {
if let Some(i) = num.as_i64() {
Some(Decimal::from(i))
} else if let Some(u) = num.as_u64() {
Some(Decimal::from(u))
} else if let Some(f) = num.as_f64() {
Decimal::from_f64(f)
} else {
None
}
}
Value::String(text) => Decimal::from_str(text).ok(),
_ => None,
}
}
#[post("/gateway/data")]
pub async fn fetch_data_route(
req: HttpRequest,
body: Option<Json<Value>>,
app_state: Data<AppState>,
) -> HttpResponse {
handle_fetch_data_route(req, body, app_state).await
}
#[post("/gateway/fetch")]
pub async fn proxy_fetch_data_route(
req: HttpRequest,
body: Option<Json<Value>>,
app_state: Data<AppState>,
) -> HttpResponse {
handle_fetch_data_route(req, body, app_state).await
}
fn build_update_payload_from_body(
json_body: &Value,
force_camel_case_to_snake_case: bool,
) -> Option<serde_json::Map<String, Value>> {
let mut payload = serde_json::Map::new();
if let Some(cols) = json_body.get("columns").and_then(Value::as_array) {
for obj in cols {
if let Some(map) = obj.as_object() {
for (k, v) in map {
let key = if force_camel_case_to_snake_case {
normalize_column_name(k, true)
} else {
k.clone()
};
payload.insert(key, v.clone());
}
}
}
} else if let Some(data) = json_body.get("data").and_then(Value::as_object) {
for (k, v) in data {
let key = if force_camel_case_to_snake_case {
normalize_column_name(k, true)
} else {
k.clone()
};
payload.insert(key, v.clone());
}
} else if let Some(set) = json_body.get("set").and_then(Value::as_object) {
for (k, v) in set {
let key = if force_camel_case_to_snake_case {
normalize_column_name(k, true)
} else {
k.clone()
};
payload.insert(key, v.clone());
}
} else {
return None;
}
if payload.is_empty() {
return None;
}
Some(payload)
}
pub(crate) async fn handle_gateway_update_route(
req: HttpRequest,
body: Option<Json<Value>>,
app_state: Data<AppState>,
) -> HttpResponse {
let operation_start: Instant = Instant::now();
let client_name: String = x_athena_client(&req.clone());
if client_name.is_empty() {
return missing_client_header_response();
}
let force_camel_case_to_snake_case: bool = app_state.gateway_force_camel_case_to_snake_case;
let auto_cast_uuid_filter_values_to_text =
app_state.gateway_auto_cast_uuid_filter_values_to_text;
let json_body = match &body {
Some(b) => b,
None => {
let auth = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![write_right_for_resource(None)],
)
.await;
let _logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
return HttpResponse::BadRequest().json(json!({
"error": "request body is required for /gateway/update"
}));
}
};
let table_name: String = json_body
.get("table_name")
.and_then(Value::as_str)
.map(String::from)
.unwrap_or_default();
let auth = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![write_right_for_resource(if table_name.is_empty() {
None
} else {
Some(&table_name)
})],
)
.await;
let logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
if table_name.is_empty() {
return HttpResponse::BadRequest().json(json!({
"error": "table_name is required"
}));
}
let set_payload_map = match build_update_payload_from_body(
json_body,
force_camel_case_to_snake_case,
) {
Some(m) => m,
None => {
return HttpResponse::BadRequest().json(json!({
"error": "update payload required: provide 'columns' (array of objects with column names and values), or 'data' / 'set' object"
}));
}
};
let set_payload: Value = Value::Object(set_payload_map);
fn parse_room_id_value(value: &Value) -> Result<i64, String> {
match value {
Value::Number(num) => num
.as_i64()
.ok_or_else(|| "room_id must be an integer".to_string()),
Value::String(text) => {
let trimmed = text.trim();
if trimmed == "*" {
return Err("room_id wildcard '*' is not allowed".to_string());
}
if trimmed.is_empty() {
return Err("room_id must not be empty".to_string());
}
trimmed
.parse::<i64>()
.map_err(|_| "room_id must be numeric".to_string())
}
_ => Err("room_id must be numeric".to_string()),
}
}
let mut conditions: Vec<RequestCondition> = vec![];
if let Some(additional_conditions) = json_body.get("conditions").and_then(|c| c.as_array()) {
for condition in additional_conditions {
let eq_column = match condition.get("eq_column").and_then(Value::as_str) {
Some(c) => c.to_string(),
None => continue,
};
let normalized_for_validation =
normalize_column_name(&eq_column, force_camel_case_to_snake_case);
let eq_value_raw = match condition.get("eq_value") {
Some(v) => v.clone(),
None => {
if normalized_for_validation == "room_id" || eq_column == "roomId" {
return HttpResponse::BadRequest().json(json!({
"error": "room_id is required and must be numeric"
}));
}
continue;
}
};
let eq_value = if normalized_for_validation == "room_id" || eq_column == "roomId" {
match parse_room_id_value(&eq_value_raw) {
Ok(room_id) => Value::Number(Number::from(room_id)),
Err(err_msg) => {
return HttpResponse::BadRequest().json(json!({ "error": err_msg }));
}
}
} else {
eq_value_raw
};
conditions.push(RequestCondition::new(eq_column, eq_value));
}
}
if conditions.is_empty() {
return HttpResponse::BadRequest().json(json!({
"error": "at least one condition is required for update (e.g. eq_column / eq_value)"
}));
}
conditions.sort_by(|a, b| a.eq_column.cmp(&b.eq_column));
let pool = match app_state.pg_registry.get_pool(&client_name) {
Some(p) => p,
None => {
return HttpResponse::BadRequest().json(json!({
"error": format!("Postgres client '{}' is not configured", client_name)
}));
}
};
let pg_conditions: Vec<Condition> = to_query_conditions(
&conditions[..],
force_camel_case_to_snake_case,
auto_cast_uuid_filter_values_to_text,
);
let update_result = update_rows(&pool, &table_name, &pg_conditions, &set_payload).await;
let updated_rows = match update_result {
Ok(rows) => rows,
Err(err) => {
if let Some(sqlx_err) = err.downcast_ref::<sqlx::Error>() {
let processed = process_sqlx_error_with_context(sqlx_err, Some(&table_name));
error!(
error_code = %processed.error_code,
trace_id = %processed.trace_id,
"gateway update_rows failed"
);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"gateway_update",
Some(&table_name),
operation_start.elapsed().as_millis(),
processed.status_code,
Some(json!({
"error_code": processed.error_code,
"trace_id": processed.trace_id,
})),
);
return HttpResponse::build(processed.status_code).json(processed.to_json());
}
let processed = ProcessedError::new(
ErrorCategory::Internal,
StatusCode::INTERNAL_SERVER_ERROR,
"update_execution_error",
"Failed to update rows due to an internal gateway error.",
generate_trace_id(),
)
.with_metadata("table", json!(table_name))
.with_metadata("client", json!(client_name))
.with_metadata("reason", json!(err.to_string()));
error!(
error = %err,
error_code = %processed.error_code,
trace_id = %processed.trace_id,
"gateway update_rows failed"
);
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"gateway_update",
Some(&table_name),
operation_start.elapsed().as_millis(),
processed.status_code,
Some(json!({
"error_code": processed.error_code,
"trace_id": processed.trace_id,
})),
);
return HttpResponse::build(processed.status_code).json(processed.to_json());
}
};
app_state.cache.invalidate_all();
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"gateway_update",
Some(&table_name),
operation_start.elapsed().as_millis(),
StatusCode::OK,
Some(json!({ "updated_count": updated_rows.len() })),
);
HttpResponse::Ok().json(json!({
"data": updated_rows
}))
}
#[post("/gateway/update")]
pub async fn gateway_update_route(
req: HttpRequest,
body: Option<Json<Value>>,
app_state: Data<AppState>,
) -> HttpResponse {
handle_gateway_update_route(req, body, app_state).await
}
#[get("/data")]
pub async fn get_data_route(
req: HttpRequest,
query: web::Query<HashMap<String, String>>,
app_state: Data<AppState>,
) -> impl Responder {
let start_time: Instant = Instant::now();
let client_name: String = x_athena_client(&req.clone());
if client_name.is_empty() {
return missing_client_header_response();
}
let force_camel_case_to_snake_case: bool = app_state.gateway_force_camel_case_to_snake_case;
let view: String = match query.get("view") {
Some(v) => v.clone(),
None => {
let auth = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![read_right_for_resource(None)],
)
.await;
let _logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
return HttpResponse::BadRequest().json(json!({
"error": "Missing required parameter: view"
}));
}
};
let auth = authorize_gateway_request(
&req,
app_state.get_ref(),
Some(&client_name),
vec![read_right_for_resource(Some(&view))],
)
.await;
let logged_request: LoggedRequest = log_request(
req.clone(),
Some(app_state.get_ref()),
Some(auth.request_id.clone()),
Some(&auth.log_context),
);
if let Some(resp) = auth.response {
return resp;
}
let eq_column_raw: String = match query.get("eq_column") {
Some(c) => c.clone(),
None => {
return HttpResponse::BadRequest().json(json!({
"error": "Missing required parameter: eq_column"
}));
}
};
let eq_column: String = eq_column_raw.clone();
let eq_value: String = match query.get("eq_value") {
Some(v) => v.clone(),
None => {
return HttpResponse::BadRequest().json(json!({
"error": "Missing required parameter: eq_value"
}));
}
};
let columns: Option<String> = query.get("columns").cloned();
let limit: Option<i32> = query.get("limit").and_then(|l| l.parse::<i32>().ok());
let current_page: Option<i32> = query
.get("current_page")
.and_then(|p| p.parse::<i32>().ok());
let page_size: Option<i32> = query.get("page_size").and_then(|s| s.parse::<i32>().ok());
let offset: Option<i32> = query.get("offset").and_then(|o| o.parse::<i32>().ok());
let total_pages: Option<i32> = query.get("total_pages").and_then(|t| t.parse::<i32>().ok());
let strip_nulls: bool = query
.get("strip_nulls")
.and_then(|s| s.parse::<bool>().ok())
.unwrap_or(false);
let normalized_columns_param: Option<String> = columns
.as_ref()
.map(|cols| normalize_columns_csv(cols, force_camel_case_to_snake_case));
let legacy_cache_key: String = format!(
"get_data_route:{}:{}:{}:{}:{}:{}:{}:{}:{}:{}:{}",
view,
eq_column,
eq_value,
normalized_columns_param.as_deref().unwrap_or(""),
limit.unwrap_or(0),
current_page.unwrap_or(1),
page_size.unwrap_or(100),
offset.unwrap_or(0),
total_pages.unwrap_or(1),
strip_nulls,
client_name
);
let get_hash_input: Value = json!({
"view": view,
"eq_column": eq_column,
"eq_value": eq_value,
"columns": normalized_columns_param.clone(),
"limit": limit,
"current_page": current_page,
"page_size": page_size,
"offset": offset,
"total_pages": total_pages,
"strip_nulls": strip_nulls,
"client": client_name,
});
let get_hash_str: String =
sha256::digest(serde_json::to_string(&get_hash_input).unwrap_or_default());
let get_short_hash: &str = &get_hash_str[..8];
let hashed_cache_key: String = format!("{}-h{}", legacy_cache_key, get_short_hash);
let cache_result_hashed: Option<HttpResponse> =
check_cache_control_and_get_response_v2(&req, app_state.clone(), &hashed_cache_key).await;
if let Some(cached_response) = cache_result_hashed {
return cached_response;
}
let cache_result_legacy: Option<HttpResponse> =
check_cache_control_and_get_response_v2(&req, app_state.clone(), &legacy_cache_key).await;
if let Some(cached_response) = cache_result_legacy {
info!(cache_key = %legacy_cache_key, duration_ms = %start_time.elapsed().as_millis(), "cache hit (GET, legacy)");
return cached_response;
}
let condition: Value = json!({
"eq_column": eq_column,
"eq_value": eq_value
});
let mut request_body: Value = json!({
"view_name": view,
"conditions": [condition]
});
if let Some(cols) = normalized_columns_param.clone() {
request_body["columns"] = json!(cols);
}
if let Some(l) = limit {
request_body["limit"] = json!(l);
}
if let Some(cp) = current_page {
request_body["current_page"] = json!(cp);
}
if let Some(ps) = page_size {
request_body["page_size"] = json!(ps);
}
if let Some(o) = offset {
request_body["offset"] = json!(o);
}
if let Some(tp) = total_pages {
request_body["total_pages"] = json!(tp);
}
if strip_nulls {
request_body["strip_nulls"] = json!(strip_nulls);
}
let client: Client = Client::new();
let mut req_builder: RequestBuilder = client
.post("http://localhost:4052/gateway/fetch")
.header("Content-Type", "application/json")
.header(
"X-User-Id",
req.headers()
.get("X-User-Id")
.and_then(|h| h.to_str().ok())
.unwrap_or(""),
)
.header("X-Athena-Client", &client_name);
if let Some(api_key) = req.headers().get("apikey").and_then(|h| h.to_str().ok()) {
req_builder = req_builder.header("apikey", api_key);
}
if let Some(api_key) = req.headers().get("x-api-key").and_then(|h| h.to_str().ok()) {
req_builder = req_builder.header("x-api-key", api_key);
}
if let Some(url) = req
.headers()
.get("x-supabase-url")
.and_then(|h| h.to_str().ok())
{
req_builder = req_builder.header("x-supabase-url", url);
}
if let Some(key) = req
.headers()
.get("x-supabase-key")
.and_then(|h| h.to_str().ok())
{
req_builder = req_builder.header("x-supabase-key", key);
}
let response: Response = match req_builder.json(&request_body).send().await {
Ok(resp) => resp,
Err(e) => {
error!("Failed to send POST request: {:#?}", e);
return HttpResponse::InternalServerError().json(json!({
"error": "Failed to process request"
}));
}
};
match response.json::<Value>().await {
Ok(data) => {
hydrate_cache_and_return_json(
app_state.clone(),
hashed_cache_key.clone(),
vec![json!({"data": data.clone()})],
)
.await;
hydrate_cache_and_return_json(
app_state.clone(),
legacy_cache_key.clone(),
vec![json!({"data": data.clone()})],
)
.await;
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"fetch_get",
Some(&view),
start_time.elapsed().as_millis(),
StatusCode::OK,
Some(json!({
"legacy_cache_key": legacy_cache_key,
"hash": hashed_cache_key
})),
);
HttpResponse::Ok().json(data)
}
Err(e) => {
error!(duration_ms = %start_time.elapsed().as_millis(), error = %e, "fetch GET failed to parse response");
log_operation_event(
Some(app_state.get_ref()),
&logged_request,
"fetch_get",
Some(&view),
start_time.elapsed().as_millis(),
StatusCode::INTERNAL_SERVER_ERROR,
Some(json!({ "error": e.to_string() })),
);
HttpResponse::InternalServerError().json(json!({
"error": "Failed to parse response"
}))
}
}
}